// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <brpc/channel.h>
#include <butil/guid.h>
#include <fmt/core.h>
#include <gen_cpp/cloud.pb.h>

#include <algorithm>
#include <cctype>
#include <charconv>
#include <chrono>
#include <numeric>
#include <queue>
#include <regex>
#include <string>
#include <tuple>
#include <unordered_set>

#include "common/bvars.h"
#include "common/config.h"
#include "common/encryption_util.h"
#include "common/logging.h"
#include "common/network_util.h"
#include "common/stats.h"
#include "common/string_util.h"
#include "cpp/sync_point.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_helper.h"
#include "meta-store/keys.h"
#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"

using namespace std::chrono;

namespace {
constexpr char pattern_str[] = "^[a-zA-Z][0-9a-zA-Z_]*$";

bool is_valid_storage_vault_name(const std::string& str) {
    const std::regex pattern(pattern_str);
    return std::regex_match(str, pattern);
}
} // namespace

namespace doris::cloud {

static std::string_view print_cluster_status(const ClusterStatus& status) {
    switch (status) {
    case ClusterStatus::UNKNOWN:
        return "UNKNOWN";
    case ClusterStatus::NORMAL:
        return "NORMAL";
    case ClusterStatus::SUSPENDED:
        return "SUSPENDED";
    case ClusterStatus::TO_RESUME:
        return "TO_RESUME";
    case ClusterStatus::MANUAL_SHUTDOWN:
        return "MANUAL_SHUTDOWN";
    default:
        return "UNKNOWN";
    }
}

static int encrypt_ak_sk_helper(const std::string plain_ak, const std::string plain_sk,
                                EncryptionInfoPB* encryption_info, AkSkPair* cipher_ak_sk_pair,
                                MetaServiceCode& code, std::string& msg) {
    std::string key;
    int64_t key_id;
    LOG_INFO("enter encrypt_ak_sk_helper, plain_ak {}", hide_access_key(plain_ak));
    int ret = get_newest_encryption_key_for_ak_sk(&key_id, &key);
    TEST_SYNC_POINT_CALLBACK("encrypt_ak_sk:get_encryption_key", &ret, &key, &key_id);
    if (ret != 0) {
        msg = "failed to get encryption key";
        code = MetaServiceCode::ERR_ENCRYPT;
        LOG(WARNING) << msg;
        return -1;
    }
    auto& encryption_method = get_encryption_method_for_ak_sk();
    AkSkPair plain_ak_sk_pair {plain_ak, plain_sk};
    ret = encrypt_ak_sk(plain_ak_sk_pair, encryption_method, key, cipher_ak_sk_pair);
    if (ret != 0) {
        msg = "failed to encrypt";
        code = MetaServiceCode::ERR_ENCRYPT;
        LOG(WARNING) << msg;
        return -1;
    }
    encryption_info->set_key_id(key_id);
    encryption_info->set_encryption_method(encryption_method);
    return 0;
}

static int decrypt_ak_sk_helper(std::string_view cipher_ak, std::string_view cipher_sk,
                                const EncryptionInfoPB& encryption_info, AkSkPair* plain_ak_sk_pair,
                                MetaServiceCode& code, std::string& msg) {
    int ret = decrypt_ak_sk_helper(cipher_ak, cipher_sk, encryption_info, plain_ak_sk_pair);
    if (ret != 0) {
        msg = "failed to decrypt";
        code = MetaServiceCode::ERR_DECPYPT;
    }
    return ret;
}

int decrypt_instance_info(InstanceInfoPB& instance, const std::string& instance_id,
                          MetaServiceCode& code, std::string& msg,
                          std::shared_ptr<Transaction>& txn) {
    for (auto& obj_info : *instance.mutable_obj_info()) {
        if (obj_info.has_encryption_info()) {
            AkSkPair plain_ak_sk_pair;
            int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(),
                                           &plain_ak_sk_pair, code, msg);
            if (ret != 0) return -1;
            obj_info.set_ak(std::move(plain_ak_sk_pair.first));
            obj_info.set_sk(std::move(plain_ak_sk_pair.second));
        }
    }
    if (instance.has_ram_user() && instance.ram_user().has_encryption_info()) {
        auto& ram_user = *instance.mutable_ram_user();
        AkSkPair plain_ak_sk_pair;
        int ret = decrypt_ak_sk_helper(ram_user.ak(), ram_user.sk(), ram_user.encryption_info(),
                                       &plain_ak_sk_pair, code, msg);
        if (ret != 0) return -1;
        ram_user.set_ak(std::move(plain_ak_sk_pair.first));
        ram_user.set_sk(std::move(plain_ak_sk_pair.second));
    }

    std::string val;
    TxnErrorCode err = txn->get(system_meta_service_arn_info_key(), &val);
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
        // For compatibility, use arn_info of config
        RamUserPB iam_user;
        iam_user.set_user_id(config::arn_id);
        iam_user.set_external_id(instance_id);
        iam_user.set_ak(config::arn_ak);
        iam_user.set_sk(config::arn_sk);
        instance.mutable_iam_user()->CopyFrom(iam_user);
    } else if (err == TxnErrorCode::TXN_OK) {
        RamUserPB iam_user;
        if (!iam_user.ParseFromString(val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            msg = "failed to parse RamUserPB";
            LOG(WARNING) << msg;
            return -1;
        }
        AkSkPair plain_ak_sk_pair;
        int ret = decrypt_ak_sk_helper(iam_user.ak(), iam_user.sk(), iam_user.encryption_info(),
                                       &plain_ak_sk_pair, code, msg);
        if (ret != 0) return -1;
        iam_user.set_ak(std::move(plain_ak_sk_pair.first));
        iam_user.set_sk(std::move(plain_ak_sk_pair.second));
        instance.mutable_iam_user()->CopyFrom(iam_user);
    } else {
        code = cast_as<ErrCategory::READ>(err);
        msg = fmt::format("failed to get arn_info_key, err={}", err);
        LOG(WARNING) << msg;
        return -1;
    }

    for (auto& stage : *instance.mutable_stages()) {
        if (stage.has_obj_info() && stage.obj_info().has_encryption_info()) {
            auto& obj_info = *stage.mutable_obj_info();
            AkSkPair plain_ak_sk_pair;
            int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(),
                                           &plain_ak_sk_pair, code, msg);
            if (ret != 0) return -1;
            obj_info.set_ak(std::move(plain_ak_sk_pair.first));
            obj_info.set_sk(std::move(plain_ak_sk_pair.second));
        }
    }
    return 0;
}

static int decrypt_and_update_ak_sk(ObjectStoreInfoPB& obj_info, MetaServiceCode& code,
                                    std::string& msg) {
    if (obj_info.has_encryption_info()) {
        AkSkPair plain_ak_sk_pair;
        if (int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(),
                                           &plain_ak_sk_pair, code, msg);
            ret != 0) {
            return ret;
        }
        obj_info.set_ak(std::move(plain_ak_sk_pair.first));
        obj_info.set_sk(std::move(plain_ak_sk_pair.second));
    }
    return 0;
};

// Asynchronously notify refresh instance in background thread
static void async_notify_refresh_instance(
        std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id, bool include_self,
        std::function<void(const KVStats&)> stats_handler = nullptr) {
    auto f = new std::function<void()>([instance_id, include_self, txn_kv = std::move(txn_kv),
                                        stats_handler = std::move(stats_handler)] {
        KVStats stats;
        notify_refresh_instance(txn_kv, instance_id, &stats, include_self);
        if (stats_handler) {
            stats_handler(stats);
        }
    });
    bthread_t bid;
    if (bthread_start_background(&bid, nullptr, run_bthread_work, f) != 0) {
        LOG(WARNING) << "notify refresh instance inplace, instance_id=" << instance_id;
        run_bthread_work(f);
    }
}

// Find all instances that need cascade update using BFS traversal
// Returns 0 on success, -1 on error
// Uses separate read-only transactions to avoid read conflicts with the main write transaction
static int collect_direct_derived_instances(TxnKv* txn_kv, const std::string& source_instance_id,
                                            std::vector<std::string>* derived_ids) {
    if (!derived_ids) {
        return -1;
    }
    derived_ids->clear();

    if (source_instance_id.empty()) {
        return 0;
    }

    constexpr Versionstamp kMinVersionstamp = Versionstamp::min();

    std::string key_start =
            versioned::snapshot_reference_key_prefix(source_instance_id, kMinVersionstamp);
    std::string key_end =
            versioned::snapshot_reference_key_prefix(source_instance_id + '\x00', kMinVersionstamp);
    std::string current_start = key_start;

    std::unique_ptr<RangeGetIterator> it;
    while (true) {
        std::unique_ptr<Transaction> read_txn;
        TxnErrorCode err = txn_kv->create_txn(&read_txn);
        if (err != TxnErrorCode::TXN_OK) {
            LOG(WARNING) << "failed to create read transaction for snapshot reference scan, err="
                         << err << " source_instance_id=" << source_instance_id;
            return -1;
        }

        err = read_txn->get(current_start, key_end, &it);
        if (err != TxnErrorCode::TXN_OK) {
            LOG(WARNING) << "failed to scan snapshot reference keys, err=" << err
                         << " source_instance_id=" << source_instance_id;
            return -1;
        }

        while (it->has_next()) {
            auto [key, value] = it->next();
            std::string derived_id;
            std::string_view key_view = key;
            if (versioned::decode_snapshot_ref_key(&key_view, nullptr, nullptr, &derived_id) &&
                !derived_id.empty()) {
                derived_ids->push_back(std::move(derived_id));
            } else {
                LOG(WARNING) << "failed to decode snapshot reference key for source_instance_id="
                             << source_instance_id << " key=" << hex(key);
            }
        }

        if (!it->more()) {
            break;
        }

        current_start = it->next_begin_key();
        if (current_start.empty() || current_start >= key_end) {
            break;
        }
    }

    return 0;
}

static int find_cascade_instances(TxnKv* txn_kv, const std::string& root_instance_id,
                                  std::vector<std::string>* out) {
    std::queue<std::string> to_visit;
    std::unordered_set<std::string> visited;
    std::vector<std::string> direct_children;

    to_visit.push(root_instance_id);
    visited.insert(root_instance_id);

    while (!to_visit.empty()) {
        std::string current_instance_id = to_visit.front();
        to_visit.pop();

        if (collect_direct_derived_instances(txn_kv, current_instance_id, &direct_children) != 0) {
            LOG(WARNING) << "failed to collect derived instances for source_instance_id="
                         << current_instance_id;
            return -1;
        }

        for (auto& derived_id : direct_children) {
            if (derived_id.empty() || visited.contains(derived_id)) {
                continue;
            }

            out->push_back(derived_id);
            to_visit.push(derived_id);
            visited.insert(derived_id);
            LOG(INFO) << "found derived instance: " << derived_id
                      << " from source: " << current_instance_id;
        }
    }

    LOG(INFO) << "find_cascade_instances completed, found " << out->size()
              << " instances to cascade from root: " << root_instance_id;
    return 0;
}

// Helper function to update AK/SK for a single instance
// Returns 0 on success, -1 on error
static int update_instance_ak_sk(InstanceInfoPB& instance, const UpdateAkSkRequest* request,
                                 uint64_t time, MetaServiceCode& code, std::string& msg,
                                 std::stringstream& update_record) {
    // Update ram_user if has ram_user
    if (request->has_ram_user()) {
        if (request->ram_user().user_id().empty() || request->ram_user().ak().empty() ||
            request->ram_user().sk().empty()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "ram user info err " + proto_to_json(*request);
            return -1;
        }
        if (!instance.has_ram_user()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "instance doesn't have ram user info";
            return -1;
        }
        auto& ram_user = request->ram_user();
        EncryptionInfoPB encryption_info;
        AkSkPair cipher_ak_sk_pair;
        if (encrypt_ak_sk_helper(ram_user.ak(), ram_user.sk(), &encryption_info, &cipher_ak_sk_pair,
                                 code, msg) != 0) {
            return -1;
        }
        const auto& [ak, sk] = cipher_ak_sk_pair;
        auto& instance_ram_user = *instance.mutable_ram_user();
        if (ram_user.user_id() != instance_ram_user.user_id()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "ram user_id err";
            return -1;
        }
        std::string old_ak = instance_ram_user.ak();
        std::string old_sk = instance_ram_user.sk();
        if (old_ak == ak && old_sk == sk) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "ak sk eq original, please check it";
            return -1;
        }
        instance_ram_user.set_ak(std::move(cipher_ak_sk_pair.first));
        instance_ram_user.set_sk(std::move(cipher_ak_sk_pair.second));
        instance_ram_user.mutable_encryption_info()->CopyFrom(encryption_info);
        update_record << "update ram_user's ak sk, instance_id: " << instance.instance_id()
                      << " user_id: " << ram_user.user_id() << " old:  cipher ak: " << old_ak
                      << " cipher sk: " << old_sk << " new: cipher ak: " << ak
                      << " cipher sk: " << sk << "; ";
    }

    // Update internal_bucket_user
    bool has_found_alter_obj_info = false;
    for (auto& alter_bucket_user : request->internal_bucket_user()) {
        if (!alter_bucket_user.has_ak() || !alter_bucket_user.has_sk() ||
            !alter_bucket_user.has_user_id()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "s3 bucket info err " + proto_to_json(*request);
            return -1;
        }
        std::string user_id = alter_bucket_user.user_id();
        EncryptionInfoPB encryption_info;
        AkSkPair cipher_ak_sk_pair;
        if (encrypt_ak_sk_helper(alter_bucket_user.ak(), alter_bucket_user.sk(), &encryption_info,
                                 &cipher_ak_sk_pair, code, msg) != 0) {
            return -1;
        }
        const auto& [ak, sk] = cipher_ak_sk_pair;
        auto& obj_info =
                const_cast<std::decay_t<decltype(instance.obj_info())>&>(instance.obj_info());
        for (auto& it : obj_info) {
            std::string old_ak = it.ak();
            std::string old_sk = it.sk();
            if (!it.has_user_id()) {
                has_found_alter_obj_info = true;
                // For compatibility, obj_info without a user_id only allow
                // single internal_bucket_user to modify it.
                if (request->internal_bucket_user_size() != 1) {
                    code = MetaServiceCode::INVALID_ARGUMENT;
                    msg = "fail to update old instance's obj_info, s3 obj info err " +
                          proto_to_json(*request);
                    return -1;
                }
                if (it.ak() == ak && it.sk() == sk) {
                    code = MetaServiceCode::INVALID_ARGUMENT;
                    msg = "ak sk eq original, please check it";
                    return -1;
                }
                it.set_mtime(time);
                it.set_user_id(user_id);
                it.set_ak(ak);
                it.set_sk(sk);
                it.mutable_encryption_info()->CopyFrom(encryption_info);
                update_record << "update obj_info's ak sk without user_id, instance_id: "
                              << instance.instance_id() << " obj_info_id: " << it.id()
                              << " new user_id: " << user_id << " old:  cipher ak: " << old_ak
                              << " cipher sk: " << old_sk << " new:  cipher ak: " << ak
                              << " cipher sk: " << sk << "; ";
                continue;
            }
            if (it.user_id() == user_id) {
                has_found_alter_obj_info = true;
                if (it.ak() == ak && it.sk() == sk) {
                    code = MetaServiceCode::INVALID_ARGUMENT;
                    msg = "ak sk eq original, please check it";
                    return -1;
                }
                it.set_mtime(time);
                it.set_ak(ak);
                it.set_sk(sk);
                it.mutable_encryption_info()->CopyFrom(encryption_info);
                update_record << "update obj_info's ak sk, instance_id: " << instance.instance_id()
                              << " obj_info_id: " << it.id() << " user_id: " << user_id
                              << " old:  cipher ak: " << old_ak << " cipher sk: " << old_sk
                              << " new:  cipher ak: " << ak << " cipher sk: " << sk << "; ";
            }
        }
    }

    if (!request->internal_bucket_user().empty() && !has_found_alter_obj_info) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "fail to find the alter obj info";
        return -1;
    }

    return 0;
}

void MetaServiceImpl::get_obj_store_info(google::protobuf::RpcController* controller,
                                         const GetObjStoreInfoRequest* request,
                                         GetObjStoreInfoResponse* response,
                                         ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_obj_store_info, get);
    TEST_SYNC_POINT_CALLBACK("obj-store-info_sk_response", &response);
    TEST_SYNC_POINT_RETURN_WITH_VOID("obj-store-info_sk_response_return");
    // Prepare data
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(get_obj_store_info)
    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    err = txn->get(key, &val);
    LOG(INFO) << "get instance_key=" << hex(key);

    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }

    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse InstanceInfoPB";
        return;
    }
    for (auto& obj_info : *instance.mutable_obj_info()) {
        if (auto ret = decrypt_and_update_ak_sk(obj_info, code, msg); ret != 0) {
            return;
        }
    }

    response->set_enable_storage_vault(instance.enable_storage_vault());

    // Iterate all the resources to return to the rpc caller
    if (!instance.resource_ids().empty()) {
        std::string storage_vault_start = storage_vault_key({instance.instance_id(), ""});
        std::string storage_vault_end = storage_vault_key({instance.instance_id(), "\xff"});
        std::unique_ptr<RangeGetIterator> it;
        do {
            TxnErrorCode err = txn->get(storage_vault_start, storage_vault_end, &it);
            if (err != TxnErrorCode::TXN_OK) {
                code = cast_as<ErrCategory::READ>(err);
                msg = fmt::format("internal error, failed to get storage vault, err={}", err);
                LOG(WARNING) << msg;
                return;
            }

            while (it->has_next()) {
                auto [k, v] = it->next();
                auto* vault = response->add_storage_vault();
                if (!vault->ParseFromArray(v.data(), v.size())) {
                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                    msg = fmt::format("malformed storage vault, unable to deserialize key={}",
                                      hex(k));
                    LOG(WARNING) << msg << " key=" << hex(k);
                    return;
                }
                if (!it->has_next()) {
                    storage_vault_start = k;
                }
            }
            storage_vault_start.push_back('\x00'); // Update to next smallest key for iteration
        } while (it->more());
    }
    for (auto& vault : *response->mutable_storage_vault()) {
        if (vault.has_obj_info()) {
            if (auto ret = decrypt_and_update_ak_sk(*vault.mutable_obj_info(), code, msg);
                ret != 0) {
                return;
            }
        }
    }

    response->mutable_obj_info()->CopyFrom(instance.obj_info());
    if (instance.has_default_storage_vault_id()) {
        response->set_default_storage_vault_id(instance.default_storage_vault_id());
        response->set_default_storage_vault_name(instance.default_storage_vault_name());
    }
}

// The next available vault id would be max(max(obj info id), max(vault id)) + 1.
static std::string next_available_vault_id(const InstanceInfoPB& instance) {
    int vault_id = 0;
    auto max = [](int prev, const auto& last) {
        int last_id = 0;
        std::string_view value = "0";
        if constexpr (std::is_same_v<std::decay_t<decltype(last)>, ObjectStoreInfoPB>) {
            value = last.id();
        } else if constexpr (std::is_same_v<std::decay_t<decltype(last)>, std::string>) {
            value = last;
        }
        if (auto [_, ec] = std::from_chars(value.data(), value.data() + value.size(), last_id);
            ec != std::errc {}) [[unlikely]] {
            LOG_WARNING("Invalid resource id format: {}", value);
            last_id = 0;
            DCHECK(false);
        }
        return std::max(prev, last_id);
    };
    auto prev = std::accumulate(
            instance.resource_ids().begin(), instance.resource_ids().end(),
            std::accumulate(instance.obj_info().begin(), instance.obj_info().end(), vault_id, max),
            max);
    return std::to_string(prev + 1);
}

namespace detail {

// Validate and normalize hdfs prefix. Return true if prefix is valid.
bool normalize_hdfs_prefix(std::string& prefix) {
    if (prefix.empty()) {
        return true;
    }

    if (prefix.find("://") != std::string::npos) {
        // Should not contain scheme
        return false;
    }

    trim(prefix);
    return true;
}

// Validate and normalize hdfs fs_name. Return true if fs_name is valid.
bool normalize_hdfs_fs_name(std::string& fs_name) {
    if (fs_name.empty()) {
        return false;
    }

    // Should check scheme existence?
    trim(fs_name);
    return !fs_name.empty();
}

} // namespace detail

static int add_hdfs_storage_vault(InstanceInfoPB& instance, Transaction* txn,
                                  StorageVaultPB& hdfs_param, MetaServiceCode& code,
                                  std::string& msg) {
#ifndef ENABLE_HDFS_STORAGE_VAULT
    code = MetaServiceCode::INVALID_ARGUMENT;
    msg = fmt::format(
            "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT build option), "
            "but HDFS storage vaults were detected: {}",
            hdfs_param.name());
    LOG(ERROR) << "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT build option), "
               << "but HDFS storage vaults were detected: " << hdfs_param.name();
    return -1;
#endif

    if (!hdfs_param.has_hdfs_info()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = fmt::format("vault_name={} passed invalid argument", hdfs_param.name());
        return -1;
    }

    using namespace detail;
    // Check and normalize hdfs conf
    auto* prefix = hdfs_param.mutable_hdfs_info()->mutable_prefix();
    if (!normalize_hdfs_prefix(*prefix)) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = fmt::format("invalid prefix: {}", *prefix);
        return -1;
    }
    if (config::enable_distinguish_hdfs_path) {
        auto uuid_suffix = butil::GenerateGUID();
        if (uuid_suffix.empty()) [[unlikely]] {
            code = MetaServiceCode::UNDEFINED_ERR;
            msg = fmt::format("failed to generate one suffix for hdfs prefix: {}", *prefix);
            return -1;
        }
        *prefix = fmt::format("{}_{}", *prefix, uuid_suffix);
    }

    auto* fs_name = hdfs_param.mutable_hdfs_info()->mutable_build_conf()->mutable_fs_name();
    if (!normalize_hdfs_fs_name(*fs_name)) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = fmt::format("invalid fs_name: {}", *fs_name);
        return -1;
    }

    std::string key;
    std::string vault_id = next_available_vault_id(instance);
    storage_vault_key({instance.instance_id(), vault_id}, &key);
    hdfs_param.set_id(vault_id);
    std::string val = hdfs_param.SerializeAsString();
    txn->put(key, val);
    LOG_INFO("try to put storage vault_id={}, vault_name={}, vault_key={}", vault_id,
             hdfs_param.name(), hex(key));
    instance.mutable_resource_ids()->Add(std::move(vault_id));
    *instance.mutable_storage_vault_names()->Add() = hdfs_param.name();
    return 0;
}

static void create_object_info_with_encrypt(const InstanceInfoPB& instance, ObjectStoreInfoPB* obj,
                                            bool sse_enabled, MetaServiceCode& code,
                                            std::string& msg) {
    std::string plain_ak = obj->has_ak() ? obj->ak() : "";
    std::string plain_sk = obj->has_sk() ? obj->sk() : "";
    std::string bucket = obj->has_bucket() ? obj->bucket() : "";
    std::string prefix = obj->has_prefix() ? obj->prefix() : "";
    // format prefix, such as `/aa/bb/`, `aa/bb//`, `//aa/bb`, `  /aa/bb` -> `aa/bb`
    trim(prefix);
    std::string endpoint = obj->has_endpoint() ? obj->endpoint() : "";
    std::string external_endpoint = obj->has_external_endpoint() ? obj->external_endpoint() : "";
    std::string region = obj->has_region() ? obj->region() : "";

    if (obj->has_role_arn()) {
        if (obj->role_arn().empty() || !obj->has_cred_provider_type() ||
            obj->cred_provider_type() != CredProviderTypePB::INSTANCE_PROFILE ||
            !obj->has_provider() || obj->provider() != ObjectStoreInfoPB::S3 || bucket.empty() ||
            endpoint.empty() || region.empty()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "s3 conf info err with role_arn, please check it";
            return;
        }
    } else {
        // ATTN: prefix may be empty
        if (plain_ak.empty() || plain_sk.empty() || bucket.empty() || endpoint.empty() ||
            region.empty() || !obj->has_provider() || external_endpoint.empty()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "s3 conf info err, please check it";
            return;
        }

        EncryptionInfoPB encryption_info;
        AkSkPair cipher_ak_sk_pair;
        auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair,
                                        code, msg);
        TEST_SYNC_POINT_CALLBACK("create_object_info_with_encrypt", &ret, &code, &msg);
        if (ret != 0) {
            return;
        }
        obj->set_ak(std::move(cipher_ak_sk_pair.first));
        obj->set_sk(std::move(cipher_ak_sk_pair.second));
        obj->mutable_encryption_info()->CopyFrom(encryption_info);
    }

    obj->set_bucket(bucket);
    obj->set_prefix(prefix);
    obj->set_endpoint(endpoint);
    obj->set_external_endpoint(external_endpoint);
    obj->set_region(region);
    obj->set_id(next_available_vault_id(instance));
    auto now_time = std::chrono::system_clock::now();
    uint64_t time =
            std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count();
    obj->set_ctime(time);
    obj->set_mtime(time);
    obj->set_sse_enabled(sse_enabled);
}

static int add_vault_into_instance(InstanceInfoPB& instance, Transaction* txn,
                                   StorageVaultPB& vault_param, MetaServiceCode& code,
                                   std::string& msg) {
    if (std::find_if(instance.storage_vault_names().begin(), instance.storage_vault_names().end(),
                     [&vault_param](const auto& name) { return name == vault_param.name(); }) !=
        instance.storage_vault_names().end()) {
        code = MetaServiceCode::ALREADY_EXISTED;
        msg = fmt::format("vault_name={} already created", vault_param.name());
        return -1;
    }

    if (vault_param.has_hdfs_info()) {
        return add_hdfs_storage_vault(instance, txn, vault_param, code, msg);
    }

    create_object_info_with_encrypt(instance, vault_param.mutable_obj_info(), true, code, msg);
    if (code != MetaServiceCode::OK) {
        return -1;
    }

    vault_param.mutable_obj_info()->CopyFrom(vault_param.obj_info());
    vault_param.set_id(vault_param.obj_info().id());
    auto vault_key = storage_vault_key({instance.instance_id(), vault_param.obj_info().id()});
    *instance.mutable_resource_ids()->Add() = vault_param.id();
    *instance.mutable_storage_vault_names()->Add() = vault_param.name();
    LOG_INFO("try to put storage vault_id={}, vault_name={}, vault_key={}", vault_param.id(),
             vault_param.name(), hex(vault_key));
    txn->put(vault_key, vault_param.SerializeAsString());
    return 0;
}

static int remove_hdfs_storage_vault(InstanceInfoPB& instance, Transaction* txn,
                                     const StorageVaultPB& hdfs_info, MetaServiceCode& code,
                                     std::string& msg) {
    std::string_view vault_name = hdfs_info.name();
    auto name_iter = std::find_if(instance.storage_vault_names().begin(),
                                  instance.storage_vault_names().end(),
                                  [&](const auto& name) { return vault_name == name; });
    if (name_iter == instance.storage_vault_names().end()) {
        code = MetaServiceCode::STORAGE_VAULT_NOT_FOUND;
        msg = fmt::format("vault_name={} not found", vault_name);
        return -1;
    }
    auto vault_idx = name_iter - instance.storage_vault_names().begin();
    auto vault_id_iter = instance.resource_ids().begin() + vault_idx;
    std::string_view vault_id = *vault_id_iter;
    std::string vault_key = storage_vault_key({instance.instance_id(), vault_id});

    txn->remove(vault_key);
    instance.mutable_storage_vault_names()->DeleteSubrange(vault_idx, 1);
    instance.mutable_resource_ids()->DeleteSubrange(vault_idx, 1);
    LOG(INFO) << "remove storage_vault_key=" << hex(vault_key);

    return 0;
}

// Log vault message and origin default storage vault message for potential tracing
static void set_default_vault_log_helper(const InstanceInfoPB& instance,
                                         std::string_view vault_name, std::string_view vault_id) {
    auto vault_msg = fmt::format("instance {} tries to set default vault as {}, id {}",
                                 instance.instance_id(), vault_id, vault_name);
    if (instance.has_default_storage_vault_id()) {
        vault_msg = fmt::format("{}, origin default vault name {}, vault id {}", vault_msg,
                                instance.default_storage_vault_name(),
                                instance.default_storage_vault_id());
    }
    LOG(INFO) << vault_msg;
}

static bool vault_exist(const InstanceInfoPB& instance, const std::string& new_vault_name) {
    for (auto& name : instance.storage_vault_names()) {
        if (new_vault_name == name) {
            return true;
        }
    }
    return false;
}

static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn,
                                    const StorageVaultPB& vault, MetaServiceCode& code,
                                    std::string& msg, AlterObjStoreInfoResponse* response) {
    if (!vault.has_hdfs_info()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        std::stringstream ss;
        ss << "There is no hdfs vault provided";
        msg = ss.str();
        return -1;
    }
    const auto& hdfs_info = vault.hdfs_info();
    if (hdfs_info.has_prefix() || !hdfs_info.has_build_conf() ||
        hdfs_info.build_conf().has_fs_name()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        std::stringstream ss;
        ss << "You can not alter prefix or fs name because it might lose previoud written data";
        msg = ss.str();
        return -1;
    }
    const auto& name = vault.name();
    // Here we try to get mutable iter since we might need to alter the vault name
    auto name_itr = std::find_if(instance.mutable_storage_vault_names()->begin(),
                                 instance.mutable_storage_vault_names()->end(),
                                 [&](const auto& vault_name) { return name == vault_name; });
    if (name_itr == instance.storage_vault_names().end()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        std::stringstream ss;
        ss << "invalid storage vault name, not found, name =" << name;
        msg = ss.str();
        return -1;
    }
    auto pos = name_itr - instance.storage_vault_names().begin();
    std::string vault_id = instance.resource_ids().begin()[pos];
    auto vault_key = storage_vault_key({instance.instance_id(), vault_id});
    std::string val;

    auto err = txn->get(vault_key, &val);
    LOG(INFO) << "get vault_key=" << hex(vault_key);

    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        std::stringstream ss;
        ss << "failed to get storage vault, vault_id=" << vault_id << ", vault_name="
           << "" << name << " err=" << err;
        msg = ss.str();
        return -1;
    }
    StorageVaultPB new_vault;
    new_vault.ParseFromString(val);

    if (!new_vault.has_hdfs_info()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        std::stringstream ss;
        ss << name << " is not hdfs storage vault";
        msg = ss.str();
        return -1;
    }

    auto origin_vault_info = new_vault.DebugString();
    if (vault.has_alter_name()) {
        if (!is_valid_storage_vault_name(vault.alter_name())) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            std::stringstream ss;
            ss << "invalid storage vault name =" << vault.alter_name() << " the name must satisfy "
               << pattern_str;
            msg = ss.str();
            return -1;
        }

        if (vault_exist(instance, vault.alter_name())) {
            code = MetaServiceCode::ALREADY_EXISTED;
            msg = fmt::format("vault_name={} already existed", vault.alter_name());
            return -1;
        }

        new_vault.set_name(vault.alter_name());
        *name_itr = vault.alter_name();
    }
    auto* alter_hdfs_info = new_vault.mutable_hdfs_info();
    if (hdfs_info.build_conf().has_hdfs_kerberos_keytab()) {
        alter_hdfs_info->mutable_build_conf()->set_hdfs_kerberos_keytab(
                hdfs_info.build_conf().hdfs_kerberos_keytab());
    }
    if (hdfs_info.build_conf().has_hdfs_kerberos_principal()) {
        alter_hdfs_info->mutable_build_conf()->set_hdfs_kerberos_principal(
                hdfs_info.build_conf().hdfs_kerberos_principal());
    }
    if (hdfs_info.build_conf().has_user()) {
        alter_hdfs_info->mutable_build_conf()->set_user(hdfs_info.build_conf().user());
    }
    if (0 != hdfs_info.build_conf().hdfs_confs_size()) {
        alter_hdfs_info->mutable_build_conf()->mutable_hdfs_confs()->Add(
                hdfs_info.build_conf().hdfs_confs().begin(),
                hdfs_info.build_conf().hdfs_confs().end());
    }
    auto new_vault_info = new_vault.DebugString();

    val = new_vault.SerializeAsString();
    if (val.empty()) {
        msg = "failed to serialize";
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        return -1;
    }

    txn->put(vault_key, val);
    LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key)
              << ", origin vault=" << origin_vault_info << ", new_vault=" << new_vault_info;

    DCHECK_EQ(new_vault.id(), vault_id);
    response->set_storage_vault_id(new_vault.id());
    return 0;
}

static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn,
                                  const StorageVaultPB& vault, MetaServiceCode& code,
                                  std::string& msg, AlterObjStoreInfoResponse* response) {
    if (!vault.has_obj_info()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        std::stringstream ss;
        ss << "There is no s3 vault provided";
        msg = ss.str();
        return -1;
    }
    const auto& obj_info = vault.obj_info();
    if (obj_info.has_bucket() || obj_info.has_endpoint() || obj_info.has_prefix() ||
        obj_info.has_provider()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        std::stringstream ss;
        ss << "Bucket, endpoint, prefix and provider can not be altered";
        msg = ss.str();
        return -1;
    }

    const auto& name = vault.name();
    // Here we try to get mutable iter since we might need to alter the vault name
    auto name_itr = std::find_if(instance.mutable_storage_vault_names()->begin(),
                                 instance.mutable_storage_vault_names()->end(),
                                 [&](const auto& vault_name) { return name == vault_name; });
    if (name_itr == instance.storage_vault_names().end()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        std::stringstream ss;
        ss << "invalid storage vault name, not found, name =" << name;
        msg = ss.str();
        return -1;
    }
    auto pos = name_itr - instance.storage_vault_names().begin();
    std::string vault_id = instance.resource_ids().begin()[pos];
    auto vault_key = storage_vault_key({instance.instance_id(), vault_id});
    std::string val;

    auto err = txn->get(vault_key, &val);
    LOG(INFO) << "get vault_key=" << hex(vault_key);

    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        std::stringstream ss;
        ss << "failed to get storage vault, vault_id=" << vault_id << ", vault_name="
           << "" << name << " err=" << err;
        msg = ss.str();
        return -1;
    }
    StorageVaultPB new_vault;
    new_vault.ParseFromString(val);
    if (!new_vault.has_obj_info()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        std::stringstream ss;
        ss << name << " is not s3 storage vault";
        msg = ss.str();
        return -1;
    }

    auto origin_vault_info = new_vault.DebugString();

    if (vault.has_alter_name()) {
        if (!is_valid_storage_vault_name(vault.alter_name())) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            std::stringstream ss;
            ss << "invalid storage vault name =" << vault.alter_name() << " the name must satisfy "
               << pattern_str;
            msg = ss.str();
            return -1;
        }

        if (vault_exist(instance, vault.alter_name())) {
            code = MetaServiceCode::ALREADY_EXISTED;
            msg = fmt::format("vault_name={} already existed", vault.alter_name());
            return -1;
        }

        new_vault.set_name(vault.alter_name());
        *name_itr = vault.alter_name();
    }

    if (obj_info.has_role_arn() && (obj_info.has_ak() || obj_info.has_sk())) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "invaild argument, both set ak/sk and role_arn is not allowed";
        LOG(WARNING) << msg;
        return -1;
    }

    if (obj_info.has_ak() ^ obj_info.has_sk()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        std::stringstream ss;
        ss << "Accesskey and secretkey must be alter together";
        msg = ss.str();
        return -1;
    }

    if (obj_info.has_ak()) {
        EncryptionInfoPB encryption_info = new_vault.obj_info().encryption_info();
        AkSkPair new_ak_sk_pair {new_vault.obj_info().ak(), new_vault.obj_info().sk()};

        // ak and sk must be altered together, there is check before.
        auto ret = encrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), &encryption_info,
                                        &new_ak_sk_pair, code, msg);
        if (ret != 0) {
            msg = "failed to encrypt";
            code = MetaServiceCode::ERR_ENCRYPT;
            LOG(WARNING) << msg;
            return -1;
        }
        new_vault.mutable_obj_info()->clear_role_arn();
        new_vault.mutable_obj_info()->clear_external_id();
        new_vault.mutable_obj_info()->clear_cred_provider_type();

        new_vault.mutable_obj_info()->set_ak(new_ak_sk_pair.first);
        new_vault.mutable_obj_info()->set_sk(new_ak_sk_pair.second);
        new_vault.mutable_obj_info()->mutable_encryption_info()->CopyFrom(encryption_info);
    }

    if (obj_info.has_role_arn()) {
        new_vault.mutable_obj_info()->clear_ak();
        new_vault.mutable_obj_info()->clear_sk();
        new_vault.mutable_obj_info()->clear_encryption_info();

        new_vault.mutable_obj_info()->set_role_arn(obj_info.role_arn());
        new_vault.mutable_obj_info()->set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE);
        if (obj_info.has_external_id()) {
            new_vault.mutable_obj_info()->set_external_id(obj_info.external_id());
        }
    }

    if (obj_info.has_use_path_style()) {
        new_vault.mutable_obj_info()->set_use_path_style(obj_info.use_path_style());
    }

    auto now_time = std::chrono::system_clock::now();
    uint64_t time =
            std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count();
    new_vault.mutable_obj_info()->set_mtime(time);

    auto new_vault_info = new_vault.DebugString();
    val = new_vault.SerializeAsString();
    if (val.empty()) {
        msg = "failed to serialize";
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        return -1;
    }

    txn->put(vault_key, val);
    LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key)
              << ", origin vault=" << encryt_sk(hide_ak(origin_vault_info))
              << ", new vault=" << encryt_sk(hide_ak(new_vault_info));

    DCHECK_EQ(new_vault.id(), vault_id);
    response->set_storage_vault_id(new_vault.id());
    return 0;
}

struct ObjectStorageDesc {
    std::string& ak;
    std::string& sk;
    std::string& bucket;
    std::string& prefix;
    std::string& endpoint;
    std::string& external_endpoint;
    std::string& region;
    bool& use_path_style;

    std::string& role_arn;
    std::string& external_id;
};

static int extract_object_storage_info(const AlterObjStoreInfoRequest* request,
                                       MetaServiceCode& code, std::string& msg,
                                       ObjectStorageDesc& obj_desc,
                                       EncryptionInfoPB& encryption_info,
                                       AkSkPair& cipher_ak_sk_pair) {
    if (!request->has_obj() && (!request->has_vault() || !request->vault().has_obj_info())) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "s3 obj info err " + proto_to_json(*request);
        return -1;
    }

    const auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info();

    //  obj size > 1k, refuse
    if (obj.ByteSizeLong() > 1024) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "s3 obj info greater than 1k " + proto_to_json(*request);
        return -1;
    };

    auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region, use_path_style, role_arn,
           external_id] = obj_desc;

    if (!obj.has_role_arn()) {
        if (!obj.has_ak() || !obj.has_sk()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "s3 obj info err " + proto_to_json(*request);
            LOG(INFO) << msg;
            return -1;
        }

        std::string plain_ak = obj.has_ak() ? obj.ak() : "";
        std::string plain_sk = obj.has_sk() ? obj.sk() : "";
        auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair,
                                        code, msg);
        if (ret != 0) {
            return -1;
        }

        ak = cipher_ak_sk_pair.first;
        sk = cipher_ak_sk_pair.second;
    } else {
        if (obj.has_ak() || obj.has_sk()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "invaild argument, both set ak/sk and role_arn is not allowed";
            return -1;
        }

        role_arn = obj.has_role_arn() ? obj.role_arn() : "";
        external_id = obj.has_external_id() ? obj.external_id() : "";
    }
    TEST_SYNC_POINT_CALLBACK("extract_object_storage_info:get_aksk_pair", &cipher_ak_sk_pair);
    bucket = obj.has_bucket() ? obj.bucket() : "";
    prefix = obj.has_prefix() ? obj.prefix() : "";
    endpoint = obj.has_endpoint() ? obj.endpoint() : "";
    external_endpoint = obj.has_external_endpoint() ? obj.external_endpoint() : "";
    region = obj.has_region() ? obj.region() : "";
    use_path_style = obj.use_path_style();
    return 0;
}

static ObjectStoreInfoPB object_info_pb_factory(ObjectStorageDesc& obj_desc,
                                                const ObjectStoreInfoPB& obj,
                                                InstanceInfoPB& instance,
                                                EncryptionInfoPB& encryption_info,
                                                AkSkPair& cipher_ak_sk_pair) {
    ObjectStoreInfoPB last_item;
    auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region, use_path_style, role_arn,
           external_id] = obj_desc;
    auto now_time = std::chrono::system_clock::now();
    uint64_t time =
            std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count();
    last_item.set_ctime(time);
    last_item.set_mtime(time);
    last_item.set_id(next_available_vault_id(instance));
    if (obj.has_user_id()) {
        last_item.set_user_id(obj.user_id());
    }

    if (!obj.has_role_arn()) {
        last_item.set_ak(std::move(cipher_ak_sk_pair.first));
        last_item.set_sk(std::move(cipher_ak_sk_pair.second));
        last_item.mutable_encryption_info()->CopyFrom(encryption_info);
    } else {
        last_item.set_role_arn(role_arn);
        last_item.set_external_id(external_id);
        last_item.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE);
    }
    last_item.set_bucket(bucket);
    // format prefix, such as `/aa/bb/`, `aa/bb//`, `//aa/bb`, `  /aa/bb` -> `aa/bb`
    trim(prefix);
    last_item.set_prefix(prefix);
    last_item.set_endpoint(endpoint);
    last_item.set_external_endpoint(external_endpoint);
    last_item.set_region(region);
    last_item.set_provider(obj.provider());
    last_item.set_sse_enabled(instance.sse_enabled());
    last_item.set_use_path_style(use_path_style);

    return last_item;
}

void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* controller,
                                          const AlterObjStoreInfoRequest* request,
                                          AlterObjStoreInfoResponse* response,
                                          ::google::protobuf::Closure* done) {
    std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region, role_arn, external_id;
    bool use_path_style;
    EncryptionInfoPB encryption_info;
    AkSkPair cipher_ak_sk_pair;
    RPC_PREPROCESS(alter_storage_vault, get, put, del);
    switch (request->op()) {
    case AlterObjStoreInfoRequest::ADD_S3_VAULT:
    case AlterObjStoreInfoRequest::DROP_S3_VAULT: {
        auto tmp_desc = ObjectStorageDesc {ak,       sk,
                                           bucket,   prefix,
                                           endpoint, external_endpoint,
                                           region,   use_path_style,
                                           role_arn, external_id};
        if (0 != extract_object_storage_info(request, code, msg, tmp_desc, encryption_info,
                                             cipher_ak_sk_pair)) {
            return;
        }
    } break;
    case AlterObjStoreInfoRequest::ADD_BUILT_IN_VAULT: {
        // It should at least has one hdfs info or obj info inside storage vault
        if ((!request->has_vault())) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "storage vault is set " + proto_to_json(*request);
            return;
        }
        break;
    }
    case AlterObjStoreInfoRequest::ADD_HDFS_INFO:
    case AlterObjStoreInfoRequest::DROP_HDFS_INFO: {
        if (!request->has_vault() || !request->vault().has_name()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "hdfs info is not found " + proto_to_json(*request);
            return;
        }
    } break;
    case AlterObjStoreInfoRequest::SET_DEFAULT_VAULT: {
        if (!request->has_vault() || !request->vault().has_name()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "hdfs info is not found " + proto_to_json(*request);
            return;
        }
        break;
    }
    case AlterObjStoreInfoRequest::ALTER_S3_VAULT:
        break;
    case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT:
        break;
    case AlterObjStoreInfoRequest::UNSET_DEFAULT_VAULT:
        break;
    case AlterObjStoreInfoRequest::UNKNOWN: {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "Unknown alter info " + proto_to_json(*request);
        return;
    } break;
    default:
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "Unknown alter obj store info, request info " + proto_to_json(*request);
        LOG_WARNING("Unknown alter obj store info, request info {}", request->DebugString());
        return;
    }

    // TODO(dx): check s3 info right

    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(alter_obj_store_info)
    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    err = txn->get(key, &val);
    LOG(INFO) << "get instance_key=" << hex(key);

    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }

    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse InstanceInfoPB";
        return;
    }

    if (instance.status() == InstanceInfoPB::DELETED) {
        code = MetaServiceCode::CLUSTER_NOT_FOUND;
        msg = "instance status has been set delete, plz check it";
        return;
    }

    switch (request->op()) {
    case AlterObjStoreInfoRequest::ADD_S3_VAULT: {
        if (!instance.enable_storage_vault()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "Storage vault doesn't support storage vault";
            return;
        }
        auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info();
        if (!obj.has_provider()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "s3 conf lease provider info";
            return;
        }
        if (instance.obj_info().size() >= 10) {
            code = MetaServiceCode::UNDEFINED_ERR;
            msg = "this instance history has greater than 10 objs, please new another instance";
            return;
        }
        // ATTN: prefix may be empty
        if (((ak.empty() || sk.empty()) && role_arn.empty()) || bucket.empty() ||
            endpoint.empty() || region.empty()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "s3 conf info err, please check it";
            return;
        }

        if (!role_arn.empty()) {
            if (!obj.has_cred_provider_type() ||
                obj.cred_provider_type() != CredProviderTypePB::INSTANCE_PROFILE ||
                !obj.has_provider() || obj.provider() != ObjectStoreInfoPB::S3) {
                code = MetaServiceCode::INVALID_ARGUMENT;
                msg = "s3 conf info err with role_arn, please check it";
                return;
            }
        }

        auto& objs = instance.obj_info();
        for (auto& it : objs) {
            if (bucket == it.bucket() && prefix == it.prefix() && endpoint == it.endpoint() &&
                region == it.region() && ak == it.ak() && sk == it.sk() &&
                obj.provider() == it.provider() && external_endpoint == it.external_endpoint()) {
                // err, anything not changed
                code = MetaServiceCode::INVALID_ARGUMENT;
                msg = "original obj infos has a same conf, please check it";
                return;
            }
        }
        // calc id
        auto tmp_tuple = ObjectStorageDesc {ak,       sk,
                                            bucket,   prefix,
                                            endpoint, external_endpoint,
                                            region,   use_path_style,
                                            role_arn, external_id};
        ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj, instance,
                                                             encryption_info, cipher_ak_sk_pair);
        if (instance.storage_vault_names().end() !=
            std::find_if(instance.storage_vault_names().begin(),
                         instance.storage_vault_names().end(),
                         [&](const std::string& candidate_name) {
                             return candidate_name == request->vault().name();
                         })) {
            code = MetaServiceCode::ALREADY_EXISTED;
            msg = fmt::format("vault_name={} already created", request->vault().name());
            return;
        }
        StorageVaultPB vault;
        vault.set_id(last_item.id());
        vault.set_name(request->vault().name());
        *instance.mutable_resource_ids()->Add() = vault.id();
        *instance.mutable_storage_vault_names()->Add() = vault.name();
        vault.mutable_obj_info()->MergeFrom(last_item);
        auto vault_key = storage_vault_key({instance.instance_id(), last_item.id()});
        txn->put(vault_key, vault.SerializeAsString());
        if (request->has_set_as_default_storage_vault() &&
            request->set_as_default_storage_vault()) {
            response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id());
            set_default_vault_log_helper(instance, vault.name(), vault.id());
            instance.set_default_storage_vault_id(vault.id());
            instance.set_default_storage_vault_name(vault.name());
        }
        response->set_storage_vault_id(vault.id());
        LOG_INFO("try to put storage vault_id={}, vault_name={}, vault_key={}", vault.id(),
                 vault.name(), hex(vault_key));
    } break;
    case AlterObjStoreInfoRequest::ADD_HDFS_INFO: {
        if (auto ret = add_vault_into_instance(
                    instance, txn.get(), const_cast<StorageVaultPB&>(request->vault()), code, msg);
            ret != 0) {
            return;
        }
        if (request->has_set_as_default_storage_vault() &&
            request->set_as_default_storage_vault()) {
            response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id());
            set_default_vault_log_helper(instance, *instance.storage_vault_names().rbegin(),
                                         *instance.resource_ids().rbegin());
            instance.set_default_storage_vault_id(*instance.resource_ids().rbegin());
            instance.set_default_storage_vault_name(*instance.storage_vault_names().rbegin());
        }
        response->set_storage_vault_id(request->vault().id());
        break;
    }
    case AlterObjStoreInfoRequest::ADD_BUILT_IN_VAULT: {
        // If the resource ids is empty then it would be the first vault
        if (!instance.resource_ids().empty()) {
            std::stringstream ss;
            code = MetaServiceCode::INVALID_ARGUMENT;
            ss << "Default vault can not be modified";
            msg = ss.str();
            return;
        }
        if (auto ret = add_vault_into_instance(
                    instance, txn.get(), const_cast<StorageVaultPB&>(request->vault()), code, msg);
            ret != 0) {
            return;
        }
        return;
    }
    case AlterObjStoreInfoRequest::DROP_HDFS_INFO: {
        if (auto ret = remove_hdfs_storage_vault(instance, txn.get(), request->vault(), code, msg);
            ret != 0) {
            return;
        }
        break;
    }
    case AlterObjStoreInfoRequest::SET_DEFAULT_VAULT: {
        const auto& name = request->vault().name();
        auto name_itr = std::find_if(instance.storage_vault_names().begin(),
                                     instance.storage_vault_names().end(),
                                     [&](const auto& vault_name) { return name == vault_name; });
        if (name_itr == instance.storage_vault_names().end()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            ss << "invalid storage vault name, name =" << name;
            msg = ss.str();
            return;
        }
        auto pos = name_itr - instance.storage_vault_names().begin();
        std::string vault_id = instance.resource_ids().begin()[pos];
        response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id());
        set_default_vault_log_helper(instance, name, vault_id);
        instance.set_default_storage_vault_id(vault_id);
        instance.set_default_storage_vault_name(name);
        response->set_storage_vault_id(vault_id);
        break;
    }
    case AlterObjStoreInfoRequest::UNSET_DEFAULT_VAULT: {
        LOG_INFO("unset instance's default vault, instance id {}, previous default vault {}, id {}",
                 instance.instance_id(), instance.default_storage_vault_name(),
                 instance.default_storage_vault_id());
        instance.clear_default_storage_vault_id();
        instance.clear_default_storage_vault_name();
        break;
    }
    case AlterObjStoreInfoRequest::ALTER_S3_VAULT: {
        alter_s3_storage_vault(instance, txn, request->vault(), code, msg, response);
        break;
    }
    case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT: {
        alter_hdfs_storage_vault(instance, txn, request->vault(), code, msg, response);
        break;
    }
    case AlterObjStoreInfoRequest::DROP_S3_VAULT:
        [[fallthrough]];
    default: {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "invalid request op, op=" << request->op();
        msg = ss.str();
        return;
    }
    }

    LOG(INFO) << "instance " << instance_id << " has " << instance.obj_info().size()
              << " s3 history info, and instance = " << proto_to_json(instance);

    val = instance.SerializeAsString();
    if (val.empty()) {
        msg = "failed to serialize";
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        return;
    }

    txn->atomic_add(system_meta_service_instance_update_key(), 1);
    txn->put(key, val);
    LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key);
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        msg = fmt::format("failed to commit kv txn, err={}", err);
        LOG(WARNING) << msg;
    }
}

void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* controller,
                                           const AlterObjStoreInfoRequest* request,
                                           AlterObjStoreInfoResponse* response,
                                           ::google::protobuf::Closure* done) {
    std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region, role_arn, external_id;
    bool use_path_style;
    EncryptionInfoPB encryption_info;
    AkSkPair cipher_ak_sk_pair;
    RPC_PREPROCESS(alter_obj_store_info, get, put);
    switch (request->op()) {
    case AlterObjStoreInfoRequest::ADD_OBJ_INFO:
    case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK:
    case AlterObjStoreInfoRequest::ALTER_OBJ_INFO:
    case AlterObjStoreInfoRequest::UPDATE_AK_SK: {
        auto tmp_desc = ObjectStorageDesc {ak,       sk,
                                           bucket,   prefix,
                                           endpoint, external_endpoint,
                                           region,   use_path_style,
                                           role_arn, external_id};
        if (0 != extract_object_storage_info(request, code, msg, tmp_desc, encryption_info,
                                             cipher_ak_sk_pair)) {
            return;
        }
    } break;
    case AlterObjStoreInfoRequest::UNKNOWN: {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "Unknown alter info " + proto_to_json(*request);
        return;
    } break;
    default:
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "Unknown alter obj store info, request info " + proto_to_json(*request);
        LOG_WARNING("Unknown alter obj store info, request info {}", request->DebugString());
        return;
    }

    // TODO(dx): check s3 info right

    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(alter_obj_store_info)
    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    err = txn->get(key, &val);
    LOG(INFO) << "get instance_key=" << hex(key);

    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }

    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse InstanceInfoPB";
        return;
    }

    if (instance.status() == InstanceInfoPB::DELETED) {
        code = MetaServiceCode::CLUSTER_NOT_FOUND;
        msg = "instance status has been set delete, plz check it";
        return;
    }

    switch (request->op()) {
    case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK:
    case AlterObjStoreInfoRequest::ALTER_OBJ_INFO: {
        // get id
        std::string id = request->obj().has_id() ? request->obj().id() : "0";
        int idx = std::stoi(id);
        if (idx < 1 || idx > instance.obj_info().size()) {
            // err
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "id invalid, please check it";
            return;
        }
        auto& obj_info =
                const_cast<std::decay_t<decltype(instance.obj_info())>&>(instance.obj_info());
        for (auto& it : obj_info) {
            if (std::stoi(it.id()) == idx) {
                if (role_arn.empty()) {
                    if (it.ak() == ak && it.sk() == sk) {
                        // not change, just return ok
                        code = MetaServiceCode::OK;
                        msg = "ak/sk not changed";
                        return;
                    }
                    it.clear_role_arn();
                    it.clear_external_id();
                    it.clear_cred_provider_type();

                    it.set_ak(ak);
                    it.set_sk(sk);
                    it.mutable_encryption_info()->CopyFrom(encryption_info);
                } else {
                    if (!ak.empty() || !sk.empty()) {
                        code = MetaServiceCode::INVALID_ARGUMENT;
                        msg = "invaild argument, both set ak/sk and role_arn is not allowed";
                        LOG(INFO) << msg;
                        return;
                    }

                    if (it.provider() != ObjectStoreInfoPB::S3) {
                        code = MetaServiceCode::INVALID_ARGUMENT;
                        msg = "role_arn is only supported for s3 provider";
                        LOG(INFO) << msg << " provider=" << it.provider();
                        return;
                    }

                    if (it.role_arn() == role_arn && it.external_id() == external_id) {
                        // not change, just return ok
                        code = MetaServiceCode::OK;
                        msg = "ak/sk not changed";
                        return;
                    }
                    it.clear_ak();
                    it.clear_sk();
                    it.clear_encryption_info();

                    it.set_role_arn(role_arn);
                    it.set_external_id(external_id);
                    it.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE);
                }

                auto now_time = std::chrono::system_clock::now();
                uint64_t time = std::chrono::duration_cast<std::chrono::seconds>(
                                        now_time.time_since_epoch())
                                        .count();
                it.set_mtime(time);
            }
        }
    } break;
    case AlterObjStoreInfoRequest::ADD_OBJ_INFO: {
        if (instance.enable_storage_vault()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "Storage vault doesn't support add obj info";
            return;
        }
        auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info();
        if (!obj.has_provider()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "s3 conf lease provider info";
            return;
        }
        if (instance.obj_info().size() >= 10) {
            code = MetaServiceCode::UNDEFINED_ERR;
            msg = "this instance history has greater than 10 objs, please new another instance";
            return;
        }
        // ATTN: prefix may be empty
        if (((ak.empty() || sk.empty()) && role_arn.empty()) || bucket.empty() ||
            endpoint.empty() || region.empty() || prefix.empty()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "s3 conf info err, please check it";
            return;
        }

        auto& objs = instance.obj_info();
        for (auto& it : objs) {
            if (bucket == it.bucket() && prefix == it.prefix() && endpoint == it.endpoint() &&
                region == it.region() && ak == it.ak() && sk == it.sk() &&
                obj.provider() == it.provider() && external_endpoint == it.external_endpoint()) {
                // err, anything not changed
                code = MetaServiceCode::INVALID_ARGUMENT;
                msg = "original obj infos has a same conf, please check it";
                return;
            }
        }
        // calc id
        auto tmp_tuple = ObjectStorageDesc {ak,       sk,
                                            bucket,   prefix,
                                            endpoint, external_endpoint,
                                            region,   use_path_style,
                                            role_arn, external_id};
        ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj, instance,
                                                             encryption_info, cipher_ak_sk_pair);
        instance.add_obj_info()->CopyFrom(last_item);
        LOG_INFO("Instance {} tries to put obj info", instance.instance_id());
    } break;
    default: {
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "invalid request op, op=" << request->op();
        msg = ss.str();
        return;
    }
    }

    LOG(INFO) << "instance " << instance_id << " has " << instance.obj_info().size()
              << " s3 history info, and instance = " << proto_to_json(instance);

    val = instance.SerializeAsString();
    if (val.empty()) {
        msg = "failed to serialize";
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        return;
    }

    txn->atomic_add(system_meta_service_instance_update_key(), 1);
    txn->put(key, val);
    LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key);
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        msg = fmt::format("failed to commit kv txn, err={}", err);
        LOG(WARNING) << msg;
    }
}

void MetaServiceImpl::update_ak_sk(google::protobuf::RpcController* controller,
                                   const UpdateAkSkRequest* request, UpdateAkSkResponse* response,
                                   ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(update_ak_sk, get, put);
    instance_id = request->has_instance_id() ? request->instance_id() : "";
    if (instance_id.empty()) {
        msg = "instance id not set";
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }
    if (!request->has_ram_user() && request->internal_bucket_user().empty()) {
        msg = "nothing to update";
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }
    RPC_RATE_LIMIT(update_ak_sk)

    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    err = txn->get(key, &val);
    LOG(INFO) << "get instance_key=" << hex(key);

    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }

    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse InstanceInfoPB";
        return;
    }

    if (instance.status() == InstanceInfoPB::DELETED) {
        code = MetaServiceCode::CLUSTER_NOT_FOUND;
        msg = "instance status has been set delete, plz check it";
        return;
    }

    auto now_time = std::chrono::system_clock::now();
    uint64_t time =
            std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count();

    std::stringstream update_record;

    // Update instance using helper function
    if (update_instance_ak_sk(instance, request, time, code, msg, update_record) != 0) {
        return;
    }

    LOG(INFO) << "instance " << instance_id << " has " << instance.obj_info().size()
              << " s3 history info, and " << instance.resource_ids_size() << " vaults "
              << "instance = " << proto_to_json(instance);

    val = instance.SerializeAsString();
    if (val.empty()) {
        msg = "failed to serialize";
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        return;
    }

    txn->atomic_add(system_meta_service_instance_update_key(), 1);
    txn->put(key, val);
    LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key);

    // Commit root instance first to avoid transaction timeout
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        msg = fmt::format("failed to commit root instance kv txn, err={}", err);
        LOG(WARNING) << msg;
        return;
    }

    LOG(INFO) << update_record.str();
    async_notify_refresh_instance(txn_kv_, instance_id, true);

    // Cascade update to derived instances using separate transactions
    // update_ak_sk is idempotent, so it's safe to use independent transactions
    if (!instance.has_snapshot_switch_status() ||
        instance.snapshot_switch_status() == SNAPSHOT_SWITCH_DISABLED) {
        LOG(INFO) << "snapshot disabled for instance_id=" << instance_id
                  << ", skip cascade updating derived instances";
        return;
    }

    std::vector<std::string> cascade_instance_ids;
    if (find_cascade_instances(txn_kv_.get(), instance_id, &cascade_instance_ids) != 0) {
        LOG(WARNING) << "failed to find derived instances for cascade update, instance_id="
                     << instance_id << ", root instance already updated successfully";
        return;
    }

    std::string cascade_ids_str;
    for (size_t i = 0; i < cascade_instance_ids.size(); ++i) {
        if (i > 0) cascade_ids_str += ", ";
        cascade_ids_str += cascade_instance_ids[i];
    }
    LOG(INFO) << "Found " << cascade_instance_ids.size()
              << " derived instances to cascade update: [" << cascade_ids_str << "]";

    for (const auto& cascade_id : cascade_instance_ids) {
        // Use a separate transaction for each derived instance
        std::unique_ptr<Transaction> cascade_txn;
        TxnErrorCode cascade_err = txn_kv_->create_txn(&cascade_txn);
        if (cascade_err != TxnErrorCode::TXN_OK) {
            LOG(WARNING) << "failed to create txn for derived instance, instance_id=" << cascade_id
                         << " err=" << cascade_err;
            continue;
        }

        InstanceKeyInfo cascade_key_info {cascade_id};
        std::string cascade_key;
        std::string cascade_val;
        instance_key(cascade_key_info, &cascade_key);

        cascade_err = cascade_txn->get(cascade_key, &cascade_val);
        if (cascade_err != TxnErrorCode::TXN_OK) {
            LOG(WARNING) << "failed to get derived instance, instance_id=" << cascade_id
                         << " err=" << cascade_err;
            continue;
        }

        InstanceInfoPB cascade_instance;
        if (!cascade_instance.ParseFromString(cascade_val)) {
            LOG(WARNING) << "failed to parse InstanceInfoPB for derived instance_id=" << cascade_id;
            continue;
        }

        // Update the cascade instance using helper function
        std::stringstream cascade_update_record;
        MetaServiceCode cascade_code = MetaServiceCode::OK;
        std::string cascade_msg;
        if (update_instance_ak_sk(cascade_instance, request, time, cascade_code, cascade_msg,
                                  cascade_update_record) != 0) {
            LOG(WARNING) << "failed to update derived instance, instance_id=" << cascade_id
                         << " msg=" << cascade_msg;
            continue;
        }

        cascade_val = cascade_instance.SerializeAsString();
        if (cascade_val.empty()) {
            LOG(WARNING) << "failed to serialize derived instance, instance_id=" << cascade_id;
            continue;
        }

        cascade_txn->put(cascade_key, cascade_val);

        cascade_err = cascade_txn->commit();
        if (cascade_err != TxnErrorCode::TXN_OK) {
            LOG(WARNING) << "failed to commit derived instance txn, instance_id=" << cascade_id
                         << " err=" << cascade_err;
            continue;
        }

        async_notify_refresh_instance(txn_kv_, cascade_id, true);
        LOG(INFO) << "cascade update for instance_id=" << cascade_id << " "
                  << cascade_update_record.str();
    }
}

void MetaServiceImpl::create_instance(google::protobuf::RpcController* controller,
                                      const CreateInstanceRequest* request,
                                      CreateInstanceResponse* response,
                                      ::google::protobuf::Closure* done) {
    TEST_SYNC_POINT_CALLBACK("create_instance_sk_request",
                             const_cast<CreateInstanceRequest**>(&request));
    RPC_PREPROCESS(create_instance, get, put);
    TEST_SYNC_POINT_RETURN_WITH_VOID("create_instance_sk_request_return");
    if (request->has_ram_user()) {
        auto& ram_user = request->ram_user();
        std::string ram_user_id = ram_user.has_user_id() ? ram_user.user_id() : "";
        std::string ram_user_ak = ram_user.has_ak() ? ram_user.ak() : "";
        std::string ram_user_sk = ram_user.has_sk() ? ram_user.sk() : "";
        if (ram_user_id.empty() || ram_user_ak.empty() || ram_user_sk.empty()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "ram user info err, please check it";
            return;
        }
    }
    instance_id = request->instance_id();
    // Prepare data
    InstanceInfoPB instance;
    instance.set_instance_id(instance_id);
    instance.set_user_id(request->has_user_id() ? request->user_id() : "");
    instance.set_name(request->has_name() ? request->name() : "");
    instance.set_status(InstanceInfoPB::NORMAL);
    instance.set_sse_enabled(request->sse_enabled());
    instance.set_enable_storage_vault(!request->has_obj_info());
    if (request->has_obj_info()) {
        create_object_info_with_encrypt(instance,
                                        const_cast<ObjectStoreInfoPB*>(&request->obj_info()),
                                        request->sse_enabled(), code, msg);
        if (code != MetaServiceCode::OK) {
            return;
        }
        instance.mutable_obj_info()->Add()->MergeFrom(request->obj_info());
    }
    if (request->has_ram_user()) {
        auto& ram_user = request->ram_user();
        EncryptionInfoPB encryption_info;
        AkSkPair cipher_ak_sk_pair;
        if (encrypt_ak_sk_helper(ram_user.ak(), ram_user.sk(), &encryption_info, &cipher_ak_sk_pair,
                                 code, msg) != 0) {
            return;
        }
        RamUserPB new_ram_user;
        new_ram_user.CopyFrom(ram_user);
        new_ram_user.set_ak(std::move(cipher_ak_sk_pair.first));
        new_ram_user.set_sk(std::move(cipher_ak_sk_pair.second));
        new_ram_user.mutable_encryption_info()->CopyFrom(encryption_info);
        instance.mutable_ram_user()->CopyFrom(new_ram_user);
    }
    if (config::enable_multi_version_status) {
        instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_READ_WRITE);
        instance.set_snapshot_switch_status(SNAPSHOT_SWITCH_OFF);
        if (config::enable_cluster_snapshot) {
            instance.set_snapshot_switch_status(SNAPSHOT_SWITCH_ON);
            instance.set_snapshot_interval_seconds(config::snapshot_min_interval_seconds);
            instance.set_max_reserved_snapshot(1);
        }
    }

    if (instance.instance_id().empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "instance id not set";
        return;
    }

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    if (request->has_vault()) {
        auto& param = const_cast<StorageVaultPB&>(request->vault());
        param.set_name(BUILT_IN_STORAGE_VAULT_NAME.data());
        if (0 != add_vault_into_instance(instance, txn.get(), param, code, msg)) {
            return;
        }
    }

    InstanceKeyInfo key_info {request->instance_id()};
    std::string key;
    std::string val = instance.SerializeAsString();
    instance_key(key_info, &key);
    if (val.empty()) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        msg = "failed to serialize";
        LOG(ERROR) << msg;
        return;
    }

    for (auto& obj_info : *instance.mutable_obj_info()) {
        obj_info.set_ak(hide_access_key(obj_info.ak()));
    }

    LOG(INFO) << "xxx instance json=" << proto_to_json(instance);

    // Check existence before proceeding
    err = txn->get(key, &val);
    if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
        std::stringstream ss;
        ss << (err == TxnErrorCode::TXN_OK ? "instance already existed"
                                           : "internal error failed to check instance")
           << ", instance_id=" << request->instance_id() << ", err=" << err;
        code = err == TxnErrorCode::TXN_OK ? MetaServiceCode::ALREADY_EXISTED
                                           : cast_as<ErrCategory::READ>(err);
        msg = ss.str();
        LOG(WARNING) << msg << " err=" << err;
        return;
    }

    txn->atomic_add(system_meta_service_instance_update_key(), 1);
    txn->put(key, val);
    LOG(INFO) << "put instance_id=" << request->instance_id() << " instance_key=" << hex(key);
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        msg = fmt::format("failed to commit kv txn, err={}", err);
        LOG(WARNING) << msg;
    }

    async_notify_refresh_instance(
            txn_kv_, request->instance_id(), /*include_self=*/true,
            [instance_id = request->instance_id()](const KVStats& stats) {
                if (config::use_detailed_metrics && !instance_id.empty()) {
                    g_bvar_rpc_kv_create_instance_get_bytes.put({instance_id}, stats.get_bytes);
                    g_bvar_rpc_kv_create_instance_get_counter.put({instance_id}, stats.get_counter);
                }
            });
}

std::pair<MetaServiceCode, std::string> handle_snapshot_switch(const std::string& instance_id,
                                                               const std::string& value,
                                                               InstanceInfoPB* instance) {
    const std::string& property_name =
            AlterInstanceRequest::SnapshotProperty_Name(AlterInstanceRequest::ENABLE_SNAPSHOT);
    if (value != "true" && value != "false") {
        return std::make_pair(MetaServiceCode::INVALID_ARGUMENT,
                              "Invalid value for " + property_name + " property: " + value +
                                      ", expected 'true' or 'false'" +
                                      ", instance_id: " + instance_id);
    }

    // Check if snapshot is not ready (UNSUPPORTED state)
    if (!instance->has_snapshot_switch_status() ||
        instance->snapshot_switch_status() == SNAPSHOT_SWITCH_DISABLED) {
        return std::make_pair(MetaServiceCode::INVALID_ARGUMENT,
                              "Snapshot is not ready, instance_id: " + instance_id);
    } else if (value == "false" && instance->snapshot_switch_status() == SNAPSHOT_SWITCH_OFF) {
        return std::make_pair(
                MetaServiceCode::INVALID_ARGUMENT,
                "Snapshot is already set to SNAPSHOT_SWITCH_OFF, instance_id: " + instance_id);
    } else if (value == "true" && instance->multi_version_status() != MULTI_VERSION_READ_WRITE) {
        // If the multi_version_status is not READ_WRITE, cannot enable snapshot because the
        // operation logs will be recycled directly since min_versionstamp is not set when multi
        // version status is WRITE_ONLY
        std::string url =
                "${MS_ENDPOINT}/MetaService/http/"
                "set_multi_version_status?multi_version_status=MULTI_VERSION_READ_WRITE";
        return std::make_pair(MetaServiceCode::INVALID_ARGUMENT,
                              fmt::format("Cannot enable snapshot when multi_version_status is not "
                                          "MULTI_VERSION_READ_WRITE. Consider enabling "
                                          "MULTI_VERSION_READ_WRITE status by curl {}",
                                          instance_id));
    } else if (value == "true") {
        instance->set_snapshot_switch_status(SNAPSHOT_SWITCH_ON);

        // Set default values when first enabling snapshot
        if (!instance->has_snapshot_interval_seconds() ||
            instance->snapshot_interval_seconds() == 0) {
            instance->set_snapshot_interval_seconds(config::snapshot_min_interval_seconds);
            LOG(INFO) << "Set default snapshot_interval_seconds to "
                      << config::snapshot_min_interval_seconds << " for instance " << instance_id;
        }
        if (!instance->has_max_reserved_snapshot()) {
            // Disable auto snapshot by default
            instance->set_max_reserved_snapshot(0);
            LOG(INFO) << "Set default max_reserved_snapshots to 0 for instance " << instance_id;
        }
    } else {
        instance->set_snapshot_switch_status(SNAPSHOT_SWITCH_OFF);
    }

    LOG(INFO) << "Set snapshot " + property_name + " to " + value + " for instance " + instance_id;

    return std::make_pair(MetaServiceCode::OK, "");
}

std::pair<MetaServiceCode, std::string> handle_max_reserved_snapshots(
        const std::string& instance_id, const std::string& value, InstanceInfoPB* instance) {
    const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name(
            AlterInstanceRequest::MAX_RESERVED_SNAPSHOTS);

    int max_snapshots;
    try {
        max_snapshots = std::stoi(value);
        if (max_snapshots < 0) {
            return std::make_pair(MetaServiceCode::INVALID_ARGUMENT,
                                  property_name + " must be non-negative, got: " + value);
        }
        if (max_snapshots > config::snapshot_max_reserved_num) {
            return std::make_pair(MetaServiceCode::INVALID_ARGUMENT,
                                  property_name + " too large, maximum is " +
                                          std::to_string(config::snapshot_max_reserved_num) +
                                          ", got: " + value);
        }
    } catch (const std::exception& e) {
        return std::make_pair(MetaServiceCode::INVALID_ARGUMENT,
                              "Invalid numeric value for max_reserved_snapshots: " + value);
    }

    instance->set_max_reserved_snapshot(max_snapshots);

    LOG(INFO) << "Set " + property_name + " to " + value + " for instance " + instance_id;

    return std::make_pair(MetaServiceCode::OK, "");
}

std::pair<MetaServiceCode, std::string> handle_snapshot_intervals(const std::string& instance_id,
                                                                  const std::string& value,
                                                                  InstanceInfoPB* instance) {
    const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name(
            AlterInstanceRequest::SNAPSHOT_INTERVAL_SECONDS);

    int intervals;
    try {
        intervals = std::stoi(value);
        if (intervals < config::snapshot_min_interval_seconds) {
            return std::make_pair(MetaServiceCode::INVALID_ARGUMENT,
                                  property_name + " too small, minimum is " +
                                          std::to_string(config::snapshot_min_interval_seconds) +
                                          " seconds, got: " + value);
        }
    } catch (const std::exception& e) {
        return std::make_pair(MetaServiceCode::INVALID_ARGUMENT,
                              "Invalid numeric value for " + property_name + ": " + value);
    }

    instance->set_snapshot_interval_seconds(intervals);

    LOG(INFO) << "Set " + property_name + " to " + value + " seconds for instance " + instance_id;

    return std::make_pair(MetaServiceCode::OK, "");
}

void MetaServiceImpl::alter_instance(google::protobuf::RpcController* controller,
                                     const AlterInstanceRequest* request,
                                     AlterInstanceResponse* response,
                                     ::google::protobuf::Closure* done) {
    StopWatch sw;
    auto ctrl = static_cast<brpc::Controller*>(controller);
    LOG(INFO) << __PRETTY_FUNCTION__ << " rpc from " << ctrl->remote_side()
              << " request=" << request->ShortDebugString();
    brpc::ClosureGuard closure_guard(done);
    MetaServiceCode code = MetaServiceCode::OK;
    std::string msg = "OK";
    [[maybe_unused]] std::stringstream ss;
    std::string instance_id = request->has_instance_id() ? request->instance_id() : "";
    DORIS_CLOUD_DEFER {
        response->mutable_status()->set_code(code);
        response->mutable_status()->set_msg(msg);
        LOG(INFO) << (code == MetaServiceCode::OK ? "succ to " : "failed to ")
                  << __PRETTY_FUNCTION__ << " " << ctrl->remote_side() << " " << msg;
        closure_guard.reset(nullptr);
        if (config::use_detailed_metrics && !instance_id.empty()) {
            g_bvar_ms_alter_instance.put(instance_id, sw.elapsed_us());
        }
    };

    std::pair<MetaServiceCode, std::string> ret;
    switch (request->op()) {
    case AlterInstanceRequest::DROP: {
        ret = alter_instance(
                request, [&request, &instance_id](Transaction* txn, InstanceInfoPB* instance) {
                    std::string msg;
                    // check instance doesn't have any cluster.
                    if (instance->clusters_size() != 0) {
                        msg = "failed to drop instance, instance has clusters";
                        LOG(WARNING) << msg;
                        return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
                    }

                    // check instance doesn't have any snapshot.
                    MetaReader meta_reader(instance_id);
                    std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
                    TxnErrorCode err = meta_reader.get_snapshots(txn, &snapshots);
                    if (err != TxnErrorCode::TXN_OK) {
                        msg = "failed to get snapshots";
                        LOG(WARNING) << msg << " err=" << err;
                        return std::make_pair(cast_as<ErrCategory::READ>(err), msg);
                    }
                    for (auto& [snapshot, _] : snapshots) {
                        if (snapshot.status() != SnapshotStatus::SNAPSHOT_RECYCLED) {
                            // still has snapshots, cannot drop
                            msg = "failed to drop instance, instance has snapshots";
                            LOG(WARNING) << msg;
                            return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
                        }
                    }

                    instance->set_status(InstanceInfoPB::DELETED);
                    instance->set_mtime(
                            duration_cast<seconds>(system_clock::now().time_since_epoch()).count());

                    std::string ret = instance->SerializeAsString();
                    if (ret.empty()) {
                        msg = "failed to serialize";
                        LOG(ERROR) << msg;
                        return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg);
                    }
                    LOG(INFO) << "put instance_id=" << request->instance_id()
                              << "drop instance json=" << proto_to_json(*instance);

                    if (instance->has_source_instance_id() && instance->has_source_snapshot_id() &&
                        !instance->source_instance_id().empty() &&
                        !instance->source_snapshot_id().empty()) {
                        Versionstamp snapshot_versionstamp;
                        if (!SnapshotManager::parse_snapshot_versionstamp(
                                    instance->source_snapshot_id(), &snapshot_versionstamp)) {
                            msg = "failed to parse snapshot_id to versionstamp, snapshot_id=" +
                                  instance->source_snapshot_id();
                            LOG(WARNING) << msg;
                            return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
                        }
                        versioned::SnapshotReferenceKeyInfo ref_key_info {
                                instance->source_instance_id(), snapshot_versionstamp, instance_id};
                        std::string reference_key = versioned::snapshot_reference_key(ref_key_info);
                        txn->remove(reference_key);
                    }
                    return std::make_pair(MetaServiceCode::OK, ret);
                });
    } break;
    case AlterInstanceRequest::RENAME: {
        ret = alter_instance(request, [&request](Transaction* txn, InstanceInfoPB* instance) {
            std::string msg;
            std::string name = request->has_name() ? request->name() : "";
            if (name.empty()) {
                msg = "rename instance name, but not set";
                LOG(WARNING) << msg;
                return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
            }
            instance->set_name(name);

            std::string ret = instance->SerializeAsString();
            if (ret.empty()) {
                msg = "failed to serialize";
                LOG(ERROR) << msg;
                return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg);
            }
            LOG(INFO) << "put instance_id=" << request->instance_id()
                      << "rename instance json=" << proto_to_json(*instance);
            return std::make_pair(MetaServiceCode::OK, ret);
        });
    } break;
    case AlterInstanceRequest::ENABLE_SSE: {
        ret = alter_instance(request, [&request](Transaction* txn, InstanceInfoPB* instance) {
            std::string msg;
            if (instance->sse_enabled()) {
                msg = "failed to enable sse, instance has enabled sse";
                LOG(WARNING) << msg;
                return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
            }
            instance->set_sse_enabled(true);
            instance->set_mtime(
                    duration_cast<seconds>(system_clock::now().time_since_epoch()).count());

            for (auto& obj_info : *(instance->mutable_obj_info())) {
                obj_info.set_sse_enabled(true);
            }
            std::string ret = instance->SerializeAsString();
            if (ret.empty()) {
                msg = "failed to serialize";
                LOG(ERROR) << msg;
                return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg);
            }
            LOG(INFO) << "put instance_id=" << request->instance_id()
                      << "instance enable sse json=" << proto_to_json(*instance);
            return std::make_pair(MetaServiceCode::OK, ret);
        });
    } break;
    case AlterInstanceRequest::DISABLE_SSE: {
        ret = alter_instance(request, [&request](Transaction* txn, InstanceInfoPB* instance) {
            std::string msg;
            if (!instance->sse_enabled()) {
                msg = "failed to disable sse, instance has disabled sse";
                LOG(WARNING) << msg;
                return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
            }
            instance->set_sse_enabled(false);
            instance->set_mtime(
                    duration_cast<seconds>(system_clock::now().time_since_epoch()).count());

            for (auto& obj_info : *(instance->mutable_obj_info())) {
                obj_info.set_sse_enabled(false);
            }
            std::string ret = instance->SerializeAsString();
            if (ret.empty()) {
                msg = "failed to serialize";
                LOG(ERROR) << msg;
                return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg);
            }
            LOG(INFO) << "put instance_id=" << request->instance_id()
                      << "instance disable sse json=" << proto_to_json(*instance);
            return std::make_pair(MetaServiceCode::OK, ret);
        });
    } break;
    case AlterInstanceRequest::REFRESH: {
        ret = resource_mgr_->refresh_instance(request->instance_id());
    } break;
    case AlterInstanceRequest::SET_OVERDUE: {
        ret = alter_instance(request, [&request](Transaction* txn, InstanceInfoPB* instance) {
            std::string msg;

            if (instance->status() == InstanceInfoPB::DELETED) {
                msg = "can't set deleted instance to overdue, instance_id = " +
                      request->instance_id();
                LOG(WARNING) << msg;
                return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
            }
            if (instance->status() == InstanceInfoPB::OVERDUE) {
                msg = "the instance has already set  instance to overdue, instance_id = " +
                      request->instance_id();
                LOG(WARNING) << msg;
                return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
            }
            instance->set_status(InstanceInfoPB::OVERDUE);
            instance->set_mtime(
                    duration_cast<seconds>(system_clock::now().time_since_epoch()).count());

            std::string ret = instance->SerializeAsString();
            if (ret.empty()) {
                msg = "failed to serialize";
                LOG(WARNING) << msg;
                return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg);
            }
            LOG(INFO) << "put instance_id=" << request->instance_id()
                      << "set instance overdue json=" << proto_to_json(*instance);
            return std::make_pair(MetaServiceCode::OK, ret);
        });
    } break;
    case AlterInstanceRequest::SET_NORMAL: {
        ret = alter_instance(request, [&request](Transaction* txn, InstanceInfoPB* instance) {
            std::string msg;

            if (instance->status() == InstanceInfoPB::DELETED) {
                msg = "can't set deleted instance to normal, instance_id = " +
                      request->instance_id();
                LOG(WARNING) << msg;
                return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
            }
            if (instance->status() == InstanceInfoPB::NORMAL) {
                msg = "the instance is already normal, instance_id = " + request->instance_id();
                LOG(WARNING) << msg;
                return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
            }
            instance->set_status(InstanceInfoPB::NORMAL);
            instance->set_mtime(
                    duration_cast<seconds>(system_clock::now().time_since_epoch()).count());

            std::string ret = instance->SerializeAsString();
            if (ret.empty()) {
                msg = "failed to serialize";
                LOG(WARNING) << msg;
                return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg);
            }
            LOG(INFO) << "put instance_id=" << request->instance_id()
                      << "set instance normal json=" << proto_to_json(*instance);
            return std::make_pair(MetaServiceCode::OK, ret);
        });
    } break;
    /**
     * Handle SET_SNAPSHOT_PROPERTY operation - configures snapshot-related properties for an instance.
     *
     * Supported property keys and their expected values:
     * - "ENABLE_SNAPSHOT": "true" | "false"
     *   Controls whether snapshot functionality is enabled for the instance
     *
     * - "MAX_RESERVED_SNAPSHOTS": numeric string (0-config::snapshot_max_reserved_num)
     *   Sets the maximum number of snapshots to retain for the instance
     *
     * - "SNAPSHOT_INTERVAL_SECONDS": numeric string (config::snapshot_min_interval_seconds-max)
     *   Sets the snapshot creation interval in seconds (minimum controlled by config)
     *
     * Each property is validated by its respective handler function which ensures
     * the provided values conform to the expected format and constraints.
     */
    case AlterInstanceRequest::SET_SNAPSHOT_PROPERTY: {
        ret = alter_instance(request, [&request, &instance_id](Transaction* txn,
                                                               InstanceInfoPB* instance) {
            std::string msg;
            auto properties = request->properties();
            if (properties.empty()) {
                msg = "snapshot properties is empty, instance_id = " + request->instance_id();
                LOG(WARNING) << msg;
                return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
            }
            for (const auto& property : properties) {
                std::string key = property.first;
                std::string value = property.second;
                AlterInstanceRequest::SnapshotProperty snapshot_property =
                        AlterInstanceRequest::UNKNOWN_PROPERTY;
                if (!AlterInstanceRequest_SnapshotProperty_Parse(key, &snapshot_property) ||
                    snapshot_property == AlterInstanceRequest::UNKNOWN_PROPERTY) {
                    msg = "unknown snapshot property: " + key;
                    LOG(WARNING) << "alter instance failed: " << msg
                                 << ", instance_id = " << instance_id;
                    return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
                }

                std::pair<MetaServiceCode, std::string> result;

                if (snapshot_property == AlterInstanceRequest::ENABLE_SNAPSHOT) {
                    result = handle_snapshot_switch(request->instance_id(), value, instance);
                } else if (snapshot_property == AlterInstanceRequest::MAX_RESERVED_SNAPSHOTS) {
                    result = handle_max_reserved_snapshots(request->instance_id(), value, instance);
                } else if (snapshot_property == AlterInstanceRequest::SNAPSHOT_INTERVAL_SECONDS) {
                    result = handle_snapshot_intervals(request->instance_id(), value, instance);
                } else {
                    msg = "unsupported property: " + key;
                    LOG(WARNING) << "alter instance failed: " << msg
                                 << ", instance_id = " << instance_id;
                    return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
                }

                if (result.first != MetaServiceCode::OK) {
                    return result;
                }
            }

            std::string ret = instance->SerializeAsString();
            if (ret.empty()) {
                msg = "failed to serialize";
                LOG(WARNING) << msg;
                return std::make_pair(MetaServiceCode::PROTOBUF_SERIALIZE_ERR, msg);
            }
            LOG(INFO) << "put instance_id=" << request->instance_id()
                      << "set instance snapshot property json=" << proto_to_json(*instance);
            return std::make_pair(MetaServiceCode::OK, ret);
        });
    } break;
    default: {
        ss << "invalid request op, op=" << request->op();
        ret = std::make_pair(MetaServiceCode::INVALID_ARGUMENT, ss.str());
    }
    }
    code = ret.first;
    msg = ret.second;

    if (request->op() == AlterInstanceRequest::REFRESH) return;

    async_notify_refresh_instance(txn_kv_, request->instance_id(), /*include_self=*/false);
}

void MetaServiceImpl::get_instance(google::protobuf::RpcController* controller,
                                   const GetInstanceRequest* request, GetInstanceResponse* response,
                                   ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_instance, get);
    TEST_SYNC_POINT_CALLBACK("get_instance_sk_response", &response);
    TEST_SYNC_POINT_RETURN_WITH_VOID("get_instance_sk_response_return");
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud_unique_id must be given";
        return;
    }
    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(get_instance);
    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    err = txn->get(key, &val);
    LOG(INFO) << "get instance_key=" << hex(key);

    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }

    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse InstanceInfoPB";
        return;
    }

    response->mutable_instance()->CopyFrom(instance);
}

std::pair<MetaServiceCode, std::string> MetaServiceImpl::alter_instance(
        const cloud::AlterInstanceRequest* request,
        std::function<std::pair<MetaServiceCode, std::string>(Transaction*, InstanceInfoPB*)>
                action) {
    MetaServiceCode code = MetaServiceCode::OK;
    std::string msg = "OK";
    std::string instance_id = request->has_instance_id() ? request->instance_id() : "";
    if (instance_id.empty()) {
        msg = "instance id not set";
        LOG(WARNING) << msg;
        return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
    }

    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);
    std::unique_ptr<Transaction> txn;
    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return std::make_pair(cast_as<ErrCategory::CREATE>(err), msg);
    }

    // Check existence before proceeding
    err = txn->get(key, &val);
    if (err != TxnErrorCode::TXN_OK) {
        std::stringstream ss;
        ss << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "instance not existed"
                                                      : "internal error failed to check instance")
           << ", instance_id=" << request->instance_id() << ", err=" << err;
        // TODO(dx): fix CLUSTER_NOT_FOUND，VERSION_NOT_FOUND，TXN_LABEL_NOT_FOUND，etc to NOT_FOUND
        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::CLUSTER_NOT_FOUND
                                                      : cast_as<ErrCategory::READ>(err);
        msg = ss.str();
        LOG(WARNING) << msg << " err=" << err;
        return std::make_pair(code, msg);
    }
    LOG(INFO) << "alter instance key=" << hex(key);
    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        msg = "failed to parse InstanceInfoPB";
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        LOG(WARNING) << msg;
        return std::make_pair(code, msg);
    }
    auto r = action(txn.get(), &instance);
    if (r.first != MetaServiceCode::OK) {
        return r;
    }
    val = r.second;
    txn->atomic_add(system_meta_service_instance_update_key(), 1);
    txn->put(key, val);
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        msg = fmt::format("failed to commit kv txn, err={}", err);
        LOG(WARNING) << msg;
        return std::make_pair(code, msg);
    }
    return std::make_pair(code, msg);
}

void handle_add_cluster(const std::string& instance_id, const ClusterInfo& cluster,
                        std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
                        MetaServiceCode& code) {
    auto r = resource_mgr->add_cluster(instance_id, cluster);
    code = r.first;
    msg = r.second;
}

void handle_drop_cluster(const std::string& instance_id, const ClusterInfo& cluster,
                         std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
                         MetaServiceCode& code) {
    auto r = resource_mgr->drop_cluster(instance_id, cluster);
    code = r.first;
    msg = r.second;
}

void handle_update_cluster_mySQL_username(const std::string& instance_id,
                                          const ClusterInfo& cluster,
                                          std::shared_ptr<ResourceManager> resource_mgr,
                                          std::string& msg, MetaServiceCode& code) {
    msg = resource_mgr->update_cluster(
            instance_id, cluster,
            [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
            [&](ClusterPB& c, std::vector<ClusterPB>&) {
                auto& mysql_user_names = cluster.cluster.mysql_user_name();
                c.mutable_mysql_user_name()->CopyFrom(mysql_user_names);
                return "";
            });
}

void handle_add_node(const std::string& instance_id, const AlterClusterRequest* request,
                     std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
                     MetaServiceCode& code) {
    resource_mgr->check_cluster_params_valid(request->cluster(), &msg, false, false);
    if (!msg.empty()) {
        LOG(WARNING) << msg;
        return;
    }
    std::vector<NodeInfo> to_add;
    std::vector<NodeInfo> to_del;
    for (auto& n : request->cluster().nodes()) {
        NodeInfo node;
        node.instance_id = request->instance_id();
        node.node_info = n;
        node.cluster_id = request->cluster().cluster_id();
        node.cluster_name = request->cluster().cluster_name();
        node.role = (request->cluster().type() == ClusterPB::SQL
                             ? Role::SQL_SERVER
                             : (request->cluster().type() == ClusterPB::COMPUTE ? Role::COMPUTE_NODE
                                                                                : Role::UNDEFINED));
        node.node_info.set_status(NodeStatusPB::NODE_STATUS_RUNNING);
        to_add.emplace_back(std::move(node));
    }
    msg = resource_mgr->modify_nodes(instance_id, to_add, to_del);
}

void handle_drop_node(const std::string& instance_id, const AlterClusterRequest* request,
                      std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
                      MetaServiceCode& code) {
    resource_mgr->check_cluster_params_valid(request->cluster(), &msg, false, false);
    if (!msg.empty()) {
        LOG(WARNING) << msg;
        return;
    }
    std::vector<NodeInfo> to_add;
    std::vector<NodeInfo> to_del;
    for (auto& n : request->cluster().nodes()) {
        NodeInfo node;
        node.instance_id = request->instance_id();
        node.node_info = n;
        node.cluster_id = request->cluster().cluster_id();
        node.cluster_name = request->cluster().cluster_name();
        node.role = (request->cluster().type() == ClusterPB::SQL
                             ? Role::SQL_SERVER
                             : (request->cluster().type() == ClusterPB::COMPUTE ? Role::COMPUTE_NODE
                                                                                : Role::UNDEFINED));
        to_del.emplace_back(std::move(node));
    }
    msg = resource_mgr->modify_nodes(instance_id, to_add, to_del);
}

void handle_decommission_node(const std::string& instance_id, const AlterClusterRequest* request,
                              std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
                              MetaServiceCode& code) {
    resource_mgr->check_cluster_params_valid(request->cluster(), &msg, false, false);
    if (msg != "") {
        LOG(WARNING) << msg;
        return;
    }

    std::string be_unique_id = (request->cluster().nodes())[0].cloud_unique_id();
    std::vector<NodeInfo> nodes;
    std::string err = resource_mgr->get_node(be_unique_id, &nodes);
    if (!err.empty()) {
        LOG(INFO) << "failed to check instance info, err=" << err;
        msg = err;
        return;
    }

    std::vector<NodeInfo> decomission_nodes;
    for (auto& node : nodes) {
        for (auto req_node : request->cluster().nodes()) {
            bool ip_processed = false;
            if (node.node_info.has_ip() && req_node.has_ip()) {
                std::string endpoint =
                        node.node_info.ip() + ":" + std::to_string(node.node_info.heartbeat_port());
                std::string req_endpoint =
                        req_node.ip() + ":" + std::to_string(req_node.heartbeat_port());
                if (endpoint == req_endpoint) {
                    decomission_nodes.push_back(node);
                    node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONING);
                }
                ip_processed = true;
            }

            if (!ip_processed && node.node_info.has_host() && req_node.has_host()) {
                std::string endpoint = node.node_info.host() + ":" +
                                       std::to_string(node.node_info.heartbeat_port());
                std::string req_endpoint =
                        req_node.host() + ":" + std::to_string(req_node.heartbeat_port());
                if (endpoint == req_endpoint) {
                    decomission_nodes.push_back(node);
                    node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONING);
                }
            }
        }
    }

    {
        std::vector<NodeInfo> to_add;
        std::vector<NodeInfo>& to_del = decomission_nodes;
        msg = resource_mgr->modify_nodes(instance_id, to_add, to_del);
    }
    {
        std::vector<NodeInfo>& to_add = decomission_nodes;
        std::vector<NodeInfo> to_del;
        for (auto& node : to_add) {
            node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONING);
            LOG(INFO) << "decomission node, "
                      << "size: " << to_add.size() << " " << node.node_info.DebugString() << " "
                      << node.cluster_id << " " << node.cluster_name;
        }
        msg = resource_mgr->modify_nodes(instance_id, to_add, to_del);
    }
}

void handle_notify_decommissioned(const std::string& instance_id,
                                  const AlterClusterRequest* request,
                                  std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
                                  MetaServiceCode& code) {
    resource_mgr->check_cluster_params_valid(request->cluster(), &msg, false, false);
    if (msg != "") {
        LOG(WARNING) << msg;
        return;
    }

    std::string be_unique_id = (request->cluster().nodes())[0].cloud_unique_id();
    std::vector<NodeInfo> nodes;
    std::string err = resource_mgr->get_node(be_unique_id, &nodes);
    if (!err.empty()) {
        LOG(INFO) << "failed to check instance info, err=" << err;
        msg = err;
        return;
    }

    std::vector<NodeInfo> decomission_nodes;
    for (auto& node : nodes) {
        for (auto req_node : request->cluster().nodes()) {
            bool ip_processed = false;
            if (node.node_info.has_ip() && req_node.has_ip()) {
                std::string endpoint =
                        node.node_info.ip() + ":" + std::to_string(node.node_info.heartbeat_port());
                std::string req_endpoint =
                        req_node.ip() + ":" + std::to_string(req_node.heartbeat_port());
                if (endpoint == req_endpoint) {
                    decomission_nodes.push_back(node);
                }
                ip_processed = true;
            }

            if (!ip_processed && node.node_info.has_host() && req_node.has_host()) {
                std::string endpoint = node.node_info.host() + ":" +
                                       std::to_string(node.node_info.heartbeat_port());
                std::string req_endpoint =
                        req_node.host() + ":" + std::to_string(req_node.heartbeat_port());
                if (endpoint == req_endpoint) {
                    decomission_nodes.push_back(node);
                }
            }
        }
    }

    {
        std::vector<NodeInfo> to_add;
        std::vector<NodeInfo>& to_del = decomission_nodes;
        msg = resource_mgr->modify_nodes(instance_id, to_add, to_del);
    }
    {
        std::vector<NodeInfo>& to_add = decomission_nodes;
        std::vector<NodeInfo> to_del;
        for (auto& node : to_add) {
            node.node_info.set_status(NodeStatusPB::NODE_STATUS_DECOMMISSIONED);
            LOG(INFO) << "notify node decomissioned, "
                      << " size: " << to_add.size() << " " << node.node_info.DebugString() << " "
                      << node.cluster_id << " " << node.cluster_name;
        }
        msg = resource_mgr->modify_nodes(instance_id, to_add, to_del);
    }
}

void handle_rename_cluster(const std::string& instance_id, const ClusterInfo& cluster,
                           std::shared_ptr<ResourceManager> resource_mgr, bool replace,
                           std::string& msg, MetaServiceCode& code) {
    msg = resource_mgr->update_cluster(
            instance_id, cluster,
            [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
            [&](ClusterPB& c, std::vector<ClusterPB>& clusters_in_instance) {
                std::string msg;
                std::stringstream ss;
                std::set<std::string> cluster_names;
                for (auto cluster_in_instance : clusters_in_instance) {
                    cluster_names.emplace(cluster_in_instance.cluster_name());
                }
                auto it = cluster_names.find(cluster.cluster.cluster_name());
                LOG(INFO) << "cluster.cluster.cluster_name(): " << cluster.cluster.cluster_name();
                for (auto itt : cluster_names) {
                    LOG(INFO) << "instance's cluster name : " << itt;
                }
                if (it != cluster_names.end()) {
                    code = MetaServiceCode::INVALID_ARGUMENT;
                    ss << "failed to rename cluster, a cluster with the same name already exists "
                          "in this instance "
                       << proto_to_json(c);
                    msg = ss.str();
                    return msg;
                }
                if (c.cluster_name() == cluster.cluster.cluster_name()) {
                    code = MetaServiceCode::INVALID_ARGUMENT;
                    ss << "failed to rename cluster, name eq original name, original cluster is "
                       << proto_to_json(c);
                    msg = ss.str();
                    return msg;
                }
                c.set_cluster_name(cluster.cluster.cluster_name());
                return msg;
            },
            replace);
}

void handle_update_cluster_endpoint(const std::string& instance_id, const ClusterInfo& cluster,
                                    std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
                                    MetaServiceCode& code) {
    msg = resource_mgr->update_cluster(
            instance_id, cluster,
            [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
            [&](ClusterPB& c, std::vector<ClusterPB>&) {
                std::string msg;
                std::stringstream ss;
                if (!cluster.cluster.has_private_endpoint() ||
                    cluster.cluster.private_endpoint().empty()) {
                    code = MetaServiceCode::CLUSTER_ENDPOINT_MISSING;
                    ss << "missing private endpoint";
                    msg = ss.str();
                    return msg;
                }
                c.set_public_endpoint(cluster.cluster.public_endpoint());
                c.set_private_endpoint(cluster.cluster.private_endpoint());
                return msg;
            });
}

void handle_set_cluster_status(const std::string& instance_id, const ClusterInfo& cluster,
                               std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
                               MetaServiceCode& code) {
    msg = resource_mgr->update_cluster(
            instance_id, cluster,
            [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
            [&](ClusterPB& c, std::vector<ClusterPB>&) {
                std::string msg;
                std::stringstream ss;
                if (ClusterPB::COMPUTE != c.type()) {
                    code = MetaServiceCode::INVALID_ARGUMENT;
                    ss << "just support set COMPUTE cluster status";
                    msg = ss.str();
                    return msg;
                }
                if (c.cluster_status() == cluster.cluster.cluster_status()) {
                    code = MetaServiceCode::INVALID_ARGUMENT;
                    ss << "failed to set cluster status, status eq original status, original "
                          "cluster is "
                       << print_cluster_status(c.cluster_status());
                    msg = ss.str();
                    return msg;
                }
                // status from -> to
                std::set<std::pair<cloud::ClusterStatus, cloud::ClusterStatus>>
                        can_work_directed_edges {
                                {ClusterStatus::UNKNOWN, ClusterStatus::NORMAL},
                                {ClusterStatus::NORMAL, ClusterStatus::SUSPENDED},
                                {ClusterStatus::SUSPENDED, ClusterStatus::TO_RESUME},
                                {ClusterStatus::TO_RESUME, ClusterStatus::NORMAL},
                                {ClusterStatus::SUSPENDED, ClusterStatus::NORMAL},
                                {ClusterStatus::NORMAL, ClusterStatus::MANUAL_SHUTDOWN},
                                {ClusterStatus::MANUAL_SHUTDOWN, ClusterStatus::NORMAL},
                        };
                auto from = c.cluster_status();
                auto to = cluster.cluster.cluster_status();
                if (can_work_directed_edges.count({from, to}) == 0) {
                    code = MetaServiceCode::INVALID_ARGUMENT;
                    ss << "failed to set cluster status, original cluster is "
                       << print_cluster_status(from) << " and want set "
                       << print_cluster_status(to);
                    msg = ss.str();
                    return msg;
                }
                c.set_cluster_status(cluster.cluster.cluster_status());
                return msg;
            });
}

void handle_alter_vcluster_info(const std::string& instance_id, const ClusterInfo& cluster,
                                std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
                                MetaServiceCode& code) {
    msg = resource_mgr->update_cluster(
            instance_id, cluster,
            [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
            [&](ClusterPB& c, std::vector<ClusterPB>& clusters_in_instance) {
                std::string msg;
                // Clear existing cluster names and set new ones if provided
                for (auto it = clusters_in_instance.begin(); it != clusters_in_instance.end();) {
                    if (c.cluster_name() == it->cluster_name()) {
                        it = clusters_in_instance.erase(it);
                    } else {
                        ++it;
                    }
                }
                if (cluster.cluster.cluster_names_size() > 0) {
                    c.clear_cluster_names(); // Clear existing cluster names
                    for (const auto& name : cluster.cluster.cluster_names()) {
                        auto [ret_code, msg] =
                                resource_mgr->validate_sub_clusters({name}, clusters_in_instance);
                        if (ret_code != MetaServiceCode::OK) {
                            LOG(WARNING) << msg;
                            return msg;
                        }
                        c.add_cluster_names(name); // Add each new name
                    }
                }

                // Check and set cluster policy if provided
                if (cluster.cluster.has_cluster_policy()) {
                    const auto& policy = cluster.cluster.cluster_policy();
                    if (policy.has_active_cluster_name()) {
                        auto [ret_code, msg] = resource_mgr->validate_sub_clusters(
                                {policy.active_cluster_name()}, clusters_in_instance);
                        if (ret_code != MetaServiceCode::OK) {
                            LOG(WARNING) << msg;
                            return msg;
                        }
                        c.mutable_cluster_policy()->set_active_cluster_name(
                                policy.active_cluster_name());
                    }

                    for (const auto& standby_name : policy.standby_cluster_names()) {
                        auto [ret_code, msg] = resource_mgr->validate_sub_clusters(
                                {standby_name}, clusters_in_instance);
                        if (ret_code != MetaServiceCode::OK) {
                            LOG(WARNING) << msg;
                            return msg;
                        }
                        c.mutable_cluster_policy()->clear_standby_cluster_names();
                        c.mutable_cluster_policy()->add_standby_cluster_names(standby_name);
                        // current just support one stadby;
                        break;
                    }

                    if (policy.has_type()) {
                        c.mutable_cluster_policy()->set_type(policy.type());
                    }

                    if (policy.has_failover_failure_threshold()) {
                        c.mutable_cluster_policy()->set_failover_failure_threshold(
                                policy.failover_failure_threshold());
                    }

                    if (policy.has_unhealthy_node_threshold_percent()) {
                        c.mutable_cluster_policy()->set_unhealthy_node_threshold_percent(
                                policy.unhealthy_node_threshold_percent());
                    }

                    if (!policy.cache_warmup_jobids().empty()) {
                        c.mutable_cluster_policy()->clear_cache_warmup_jobids();
                    }

                    for (const auto& warmup_jobid : policy.cache_warmup_jobids()) {
                        c.mutable_cluster_policy()->add_cache_warmup_jobids(warmup_jobid);
                    }
                }

                // Validate the virtual cluster after alterations
                if (!resource_mgr->validate_virtual_cluster(c, &msg)) {
                    return msg; // Return validation error
                }
                return msg; // Return success or empty message
            });
}

void handle_alter_properties(const std::string& instance_id, const ClusterInfo& cluster,
                             std::shared_ptr<ResourceManager> resource_mgr, std::string& msg,
                             MetaServiceCode& code) {
    msg = resource_mgr->update_cluster(
            instance_id, cluster,
            [&](const ClusterPB& i) { return i.cluster_id() == cluster.cluster.cluster_id(); },
            [&](ClusterPB& c, std::vector<ClusterPB>&) {
                std::string msg;
                std::stringstream ss;
                if (ClusterPB::COMPUTE != c.type()) {
                    code = MetaServiceCode::INVALID_ARGUMENT;
                    ss << "just support set COMPUTE cluster status";
                    msg = ss.str();
                    return msg;
                }
                *c.mutable_properties() = cluster.cluster.properties();
                return msg;
            });
}

void MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
                                    const AlterClusterRequest* request,
                                    AlterClusterResponse* response,
                                    ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(alter_cluster, get);
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    instance_id = request->has_instance_id() ? request->instance_id() : "";

    if (!cloud_unique_id.empty() && instance_id.empty()) {
        auto [is_degraded_format, id] =
                ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id);
        if (config::enable_check_instance_id && is_degraded_format &&
            !resource_mgr_->is_instance_id_registered(id)) {
            msg = "use degrade cloud_unique_id, but instance_id invalid, cloud_unique_id=" +
                  cloud_unique_id;
            LOG(WARNING) << msg;
            code = MetaServiceCode::INVALID_ARGUMENT;
            return;
        }
        instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
        if (instance_id.empty()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "empty instance_id";
            LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
            return;
        }
    }

    if (instance_id.empty() || !request->has_cluster()) {
        msg = "invalid request instance_id or cluster not given";
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }

    if (!request->has_op()) {
        msg = "op not given";
        code = MetaServiceCode::INVALID_ARGUMENT;
        return;
    }

    LOG(INFO) << "alter cluster instance_id=" << instance_id << " op=" << request->op();
    ClusterInfo cluster;
    cluster.cluster.CopyFrom(request->cluster());

    switch (request->op()) {
    case AlterClusterRequest::ADD_CLUSTER:
        handle_add_cluster(instance_id, cluster, resource_mgr(), msg, code);
        break;
    case AlterClusterRequest::DROP_CLUSTER:
        handle_drop_cluster(instance_id, cluster, resource_mgr(), msg, code);
        break;
    case AlterClusterRequest::UPDATE_CLUSTER_MYSQL_USER_NAME:
        handle_update_cluster_mySQL_username(instance_id, cluster, resource_mgr(), msg, code);
        break;
    case AlterClusterRequest::ADD_NODE:
        handle_add_node(instance_id, request, resource_mgr(), msg, code);
        break;
    case AlterClusterRequest::DROP_NODE:
        handle_drop_node(instance_id, request, resource_mgr(), msg, code);
        break;
    case AlterClusterRequest::DECOMMISSION_NODE:
        handle_decommission_node(instance_id, request, resource_mgr(), msg, code);
        break;
    case AlterClusterRequest::NOTIFY_DECOMMISSIONED:
        handle_notify_decommissioned(instance_id, request, resource_mgr(), msg, code);
        break;
    case AlterClusterRequest::RENAME_CLUSTER: {
        // SQL mode, cluster cluster name eq empty cluster name, need drop empty cluster first.
        // but in http api, cloud control will drop empty cluster
        bool replace_if_existing_empty_target_cluster =
                request->has_replace_if_existing_empty_target_cluster()
                        ? request->replace_if_existing_empty_target_cluster()
                        : false;
        handle_rename_cluster(instance_id, cluster, resource_mgr(),
                              replace_if_existing_empty_target_cluster, msg, code);
        break;
    }
    case AlterClusterRequest::UPDATE_CLUSTER_ENDPOINT:
        handle_update_cluster_endpoint(instance_id, cluster, resource_mgr(), msg, code);
        break;
    case AlterClusterRequest::SET_CLUSTER_STATUS:
        handle_set_cluster_status(instance_id, cluster, resource_mgr(), msg, code);
        break;
    case AlterClusterRequest::ALTER_VCLUSTER_INFO:
        handle_alter_vcluster_info(instance_id, cluster, resource_mgr(), msg, code);
        break;
    case AlterClusterRequest::ALTER_PROPERTIES:
        handle_alter_properties(instance_id, cluster, resource_mgr(), msg, code);
        break;
    default:
        code = MetaServiceCode::INVALID_ARGUMENT;
        ss << "invalid request op, op=" << request->op();
        msg = ss.str();
        return;
    }

    if (!msg.empty() && code == MetaServiceCode::OK) {
        code = MetaServiceCode::INVALID_ARGUMENT;
    }

    // ugly but easy to repair
    // not change cloud.proto add err_code
    if (request->op() == AlterClusterRequest::DROP_NODE &&
        msg.find("not found") != std::string::npos) {
        // see convert_ms_code_to_http_code, reuse CLUSTER_NOT_FOUND, return http status code 404
        code = MetaServiceCode::CLUSTER_NOT_FOUND;
    }

    if (code != MetaServiceCode::OK) return;

    async_notify_refresh_instance(
            txn_kv_, request->instance_id(), /*include_self=*/false,
            [instance_id = request->instance_id()](const KVStats& stats) {
                if (config::use_detailed_metrics && !instance_id.empty()) {
                    g_bvar_rpc_kv_alter_cluster_get_bytes.put({instance_id}, stats.get_bytes);
                    g_bvar_rpc_kv_alter_cluster_get_counter.put({instance_id}, stats.get_counter);
                }
            });
}

void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller,
                                  const GetClusterRequest* request, GetClusterResponse* response,
                                  ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_cluster, get, put);
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    std::string cluster_id = request->has_cluster_id() ? request->cluster_id() : "";
    std::string cluster_name = request->has_cluster_name() ? request->cluster_name() : "";
    std::string mysql_user_name = request->has_mysql_user_name() ? request->mysql_user_name() : "";

    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud_unique_id must be given";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        if (request->has_instance_id()) {
            instance_id = request->instance_id();
            // FIXME(gavin): this mechanism benifits debugging and
            //               administration, is it dangerous?
            LOG(WARNING) << "failed to get instance_id with cloud_unique_id=" << cloud_unique_id
                         << " use the given instance_id=" << instance_id << " instead";
        } else {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "empty instance_id";
            LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
            return;
        }
    }
    RPC_RATE_LIMIT(get_cluster)
    // ATTN: if the case that multiple conditions are satisfied, just use by this order:
    // cluster_id -> cluster_name -> mysql_user_name
    if (!cluster_id.empty()) {
        cluster_name = "";
        mysql_user_name = "";
    } else if (!cluster_name.empty()) {
        mysql_user_name = "";
    }

    bool get_all_cluster_info = false;
    // if cluster_id、cluster_name、mysql_user_name all empty, get this instance's all cluster info.
    if (cluster_id.empty() && cluster_name.empty() && mysql_user_name.empty()) {
        get_all_cluster_info = true;
    }

    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "failed to get instance_id with cloud_unique_id=" + cloud_unique_id;
        return;
    }

    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    err = txn->get(key, &val);
    LOG(INFO) << "get instance_key=" << hex(key);

    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }

    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse InstanceInfoPB";
        return;
    }

    response->set_enable_storage_vault(instance.enable_storage_vault());
    if (instance.enable_storage_vault() &&
        std::find_if(instance.storage_vault_names().begin(), instance.storage_vault_names().end(),
                     [](const std::string& name) { return name == BUILT_IN_STORAGE_VAULT_NAME; }) ==
                instance.storage_vault_names().end()) {
        LOG_EVERY_N(INFO, 100) << "There is no builtin vault in instance "
                               << instance.instance_id();
    }

    auto get_cluster_mysql_user = [](const ClusterPB& c, std::set<std::string>* mysql_users) {
        for (int i = 0; i < c.mysql_user_name_size(); i++) {
            mysql_users->emplace(c.mysql_user_name(i));
        }
    };

    if (get_all_cluster_info) {
        response->mutable_cluster()->CopyFrom(instance.clusters());
        LOG_EVERY_N(INFO, 100) << "get all cluster info, " << msg;
    } else {
        bool is_instance_changed = false;
        for (int i = 0; i < instance.clusters_size(); ++i) {
            auto& c = instance.clusters(i);
            std::set<std::string> mysql_users;
            get_cluster_mysql_user(c, &mysql_users);
            // The last wins if add_cluster() does not ensure uniqueness of
            // cluster_id and cluster_name respectively
            if ((c.has_cluster_name() && c.cluster_name() == cluster_name) ||
                (c.has_cluster_id() && c.cluster_id() == cluster_id) ||
                mysql_users.count(mysql_user_name)) {
                // just one cluster
                response->add_cluster()->CopyFrom(c);
                LOG_EVERY_N(INFO, 100) << "found a cluster, instance_id=" << instance.instance_id()
                                       << " cluster=" << msg;
            }
        }
        if (is_instance_changed) {
            val = instance.SerializeAsString();
            if (val.empty()) {
                msg = "failed to serialize";
                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
                return;
            }

            txn->put(key, val);
            LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key)
                      << " json=" << proto_to_json(instance);
            err = txn->commit();
            if (err != TxnErrorCode::TXN_OK) {
                code = cast_as<ErrCategory::COMMIT>(err);
                msg = fmt::format("failed to commit kv txn, err={}", err);
                LOG(WARNING) << msg;
            }
        }
    }

    if (response->cluster().empty()) {
        ss << "fail to get cluster with " << request->ShortDebugString();
        msg = ss.str();
        std::replace(msg.begin(), msg.end(), '\n', ' ');
        code = MetaServiceCode::CLUSTER_NOT_FOUND;
    }
} // get_cluster

void MetaServiceImpl::create_stage(::google::protobuf::RpcController* controller,
                                   const CreateStageRequest* request, CreateStageResponse* response,
                                   ::google::protobuf::Closure* done) {
    TEST_SYNC_POINT_CALLBACK("create_stage_sk_request", const_cast<CreateStageRequest**>(&request));
    RPC_PREPROCESS(create_stage, get, put);
    TEST_SYNC_POINT_RETURN_WITH_VOID("create_stage_sk_request_return");
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(create_stage)

    if (!request->has_stage()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "stage not set";
        return;
    }
    auto stage = request->stage();

    if (!stage.has_type()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "stage type not set";
        return;
    }

    if (stage.name().empty() && stage.type() == StagePB::EXTERNAL) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "stage name not set";
        return;
    }
    if (stage.stage_id().empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "stage id not set";
        return;
    }

    if (stage.type() == StagePB::INTERNAL) {
        if (stage.mysql_user_name().empty() || stage.mysql_user_id().empty()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            ss << "internal stage must have a mysql user name and id must be given, name size="
               << stage.mysql_user_name_size() << " id size=" << stage.mysql_user_id_size();
            msg = ss.str();
            LOG(WARNING) << msg;
            return;
        }
    }

    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    err = txn->get(key, &val);
    LOG(INFO) << "get instance_key=" << hex(key);

    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }

    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse InstanceInfoPB";
        return;
    }

    VLOG_DEBUG << "config stages num=" << config::max_num_stages;
    if (instance.stages_size() >= config::max_num_stages) {
        code = MetaServiceCode::UNDEFINED_ERR;
        msg = "this instance has greater than config num stages";
        LOG(WARNING) << "can't create more than config num stages, and instance has "
                     << std::to_string(instance.stages_size());
        return;
    }

    // check if the stage exists
    for (int i = 0; i < instance.stages_size(); ++i) {
        auto& s = instance.stages(i);
        if (stage.type() == StagePB::INTERNAL) {
            // check all internal stage format is right
            if (s.type() == StagePB::INTERNAL && s.mysql_user_id_size() == 0) {
                LOG(WARNING) << "impossible, internal stage must have at least one id instance="
                             << proto_to_json(instance);
            }

            if (s.type() == StagePB::INTERNAL &&
                (s.mysql_user_id(0) == stage.mysql_user_id(0) ||
                 s.mysql_user_name(0) == stage.mysql_user_name(0))) {
                code = MetaServiceCode::ALREADY_EXISTED;
                msg = "stage already exist";
                ss << "stage already exist, req user_name=" << stage.mysql_user_name(0)
                   << " existed user_name=" << s.mysql_user_name(0)
                   << "req user_id=" << stage.mysql_user_id(0)
                   << " existed user_id=" << s.mysql_user_id(0);
                return;
            }
        }

        if (stage.type() == StagePB::EXTERNAL) {
            if (s.name() == stage.name()) {
                code = MetaServiceCode::ALREADY_EXISTED;
                msg = "stage already exist";
                return;
            }
        }

        if (s.stage_id() == stage.stage_id()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "stage id is duplicated";
            return;
        }
    }

    if (stage.type() == StagePB::INTERNAL) {
        if (instance.obj_info_size() == 0) {
            LOG(WARNING) << "impossible, instance must have at least one obj_info.";
            code = MetaServiceCode::UNDEFINED_ERR;
            msg = "impossible, instance must have at least one obj_info.";
            return;
        }
        auto& lastest_obj = instance.obj_info()[instance.obj_info_size() - 1];
        // ${obj_prefix}/stage/{username}/{user_id}
        std::string mysql_user_name = stage.mysql_user_name(0);
        std::string prefix = fmt::format("{}/stage/{}/{}", lastest_obj.prefix(), mysql_user_name,
                                         stage.mysql_user_id(0));
        auto as = instance.add_stages();
        as->mutable_obj_info()->set_prefix(prefix);
        as->mutable_obj_info()->set_id(lastest_obj.id());
        as->add_mysql_user_name(mysql_user_name);
        as->add_mysql_user_id(stage.mysql_user_id(0));
        as->set_stage_id(stage.stage_id());
    } else if (stage.type() == StagePB::EXTERNAL) {
        if (!stage.has_obj_info()) {
            instance.add_stages()->CopyFrom(stage);
        } else {
            StagePB tmp_stage;
            tmp_stage.CopyFrom(stage);
            auto obj_info = tmp_stage.mutable_obj_info();
            EncryptionInfoPB encryption_info;
            AkSkPair cipher_ak_sk_pair;
            if (encrypt_ak_sk_helper(obj_info->ak(), obj_info->sk(), &encryption_info,
                                     &cipher_ak_sk_pair, code, msg) != 0) {
                return;
            }
            obj_info->set_ak(std::move(cipher_ak_sk_pair.first));
            obj_info->set_sk(std::move(cipher_ak_sk_pair.second));
            obj_info->mutable_encryption_info()->CopyFrom(encryption_info);
            instance.add_stages()->CopyFrom(tmp_stage);
        }
    }
    val = instance.SerializeAsString();
    if (val.empty()) {
        msg = "failed to serialize";
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        return;
    }

    txn->atomic_add(system_meta_service_instance_update_key(), 1);
    txn->put(key, val);
    LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key)
              << " json=" << proto_to_json(instance);
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        msg = fmt::format("failed to commit kv txn, err={}", err);
        LOG(WARNING) << msg;
    }
}

void MetaServiceImpl::get_stage(google::protobuf::RpcController* controller,
                                const GetStageRequest* request, GetStageResponse* response,
                                ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_stage, get);
    TEST_SYNC_POINT_CALLBACK("stage_sk_response", &response);
    TEST_SYNC_POINT_RETURN_WITH_VOID("stage_sk_response_return");
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(get_stage)
    if (!request->has_type()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "stage type not set";
        return;
    }
    auto type = request->type();

    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    err = txn->get(key, &val);
    LOG(INFO) << "get instance_key=" << hex(key);

    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }

    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse InstanceInfoPB";
        return;
    }

    if (type == StagePB::INTERNAL) {
        auto mysql_user_name = request->has_mysql_user_name() ? request->mysql_user_name() : "";
        if (mysql_user_name.empty()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "mysql user name not set";
            return;
        }
        auto mysql_user_id = request->has_mysql_user_id() ? request->mysql_user_id() : "";
        if (mysql_user_id.empty()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            msg = "mysql user id not set";
            return;
        }

        // check mysql user_name has been created internal stage
        auto& stage = instance.stages();
        bool found = false;
        if (instance.obj_info_size() == 0) {
            LOG(WARNING) << "impossible, instance must have at least one obj_info.";
            code = MetaServiceCode::UNDEFINED_ERR;
            msg = "impossible, instance must have at least one obj_info.";
            return;
        }

        for (auto s : stage) {
            if (s.type() != StagePB::INTERNAL) {
                continue;
            }
            if (s.mysql_user_name().size() == 0 || s.mysql_user_id().size() == 0) {
                LOG(WARNING) << "impossible here, internal stage must have at least one user, "
                                "invalid stage="
                             << proto_to_json(s);
                continue;
            }
            if (s.mysql_user_name(0) == mysql_user_name) {
                StagePB stage_pb;
                // internal stage id is user_id, if user_id not eq internal stage's user_id, del it.
                // let fe create a new internal stage
                if (s.mysql_user_id(0) != mysql_user_id) {
                    LOG(INFO) << "ABA user=" << mysql_user_name
                              << " internal stage original user_id=" << s.mysql_user_id()[0]
                              << " rpc user_id=" << mysql_user_id
                              << " stage info=" << proto_to_json(s);
                    code = MetaServiceCode::STATE_ALREADY_EXISTED_FOR_USER;
                    msg = "aba user, drop stage and create a new one";
                    // response return to be dropped stage id.
                    stage_pb.CopyFrom(s);
                    response->add_stage()->CopyFrom(stage_pb);
                    return;
                }
                // find, use it stage prefix and id
                found = true;
                // get from internal stage
                int idx = stoi(s.obj_info().id());
                if (idx > instance.obj_info().size() || idx < 1) {
                    LOG(WARNING) << "invalid idx: " << idx;
                    code = MetaServiceCode::UNDEFINED_ERR;
                    msg = "impossible, id invalid";
                    return;
                }
                auto& old_obj = instance.obj_info()[idx - 1];

                stage_pb.mutable_obj_info()->set_ak(old_obj.ak());
                stage_pb.mutable_obj_info()->set_sk(old_obj.sk());
                if (old_obj.has_encryption_info()) {
                    AkSkPair plain_ak_sk_pair;
                    int ret = decrypt_ak_sk_helper(old_obj.ak(), old_obj.sk(),
                                                   old_obj.encryption_info(), &plain_ak_sk_pair,
                                                   code, msg);
                    if (ret != 0) return;
                    stage_pb.mutable_obj_info()->set_ak(std::move(plain_ak_sk_pair.first));
                    stage_pb.mutable_obj_info()->set_sk(std::move(plain_ak_sk_pair.second));
                }
                stage_pb.mutable_obj_info()->set_bucket(old_obj.bucket());
                stage_pb.mutable_obj_info()->set_endpoint(old_obj.endpoint());
                stage_pb.mutable_obj_info()->set_external_endpoint(old_obj.external_endpoint());
                stage_pb.mutable_obj_info()->set_region(old_obj.region());
                stage_pb.mutable_obj_info()->set_provider(old_obj.provider());
                stage_pb.mutable_obj_info()->set_prefix(s.obj_info().prefix());
                stage_pb.set_stage_id(s.stage_id());
                stage_pb.set_type(s.type());
                response->add_stage()->CopyFrom(stage_pb);
                return;
            }
        }
        if (!found) {
            LOG(INFO) << "user=" << mysql_user_name
                      << " not have a valid stage, rpc user_id=" << mysql_user_id;
            code = MetaServiceCode::STAGE_NOT_FOUND;
            msg = "stage not found, create a new one";
            return;
        }
    }

    // get all external stages for display, but don't show ak/sk, so there is no need to decrypt ak/sk.
    if (type == StagePB::EXTERNAL && !request->has_stage_name()) {
        for (int i = 0; i < instance.stages_size(); ++i) {
            auto& s = instance.stages(i);
            if (s.type() != StagePB::EXTERNAL) {
                continue;
            }
            response->add_stage()->CopyFrom(s);
        }
        return;
    }

    // get external stage with the specified stage name
    for (int i = 0; i < instance.stages_size(); ++i) {
        auto& s = instance.stages(i);
        if (s.type() == type && s.name() == request->stage_name()) {
            StagePB stage;
            stage.CopyFrom(s);
            if (!stage.has_access_type() || stage.access_type() == StagePB::AKSK) {
                stage.set_access_type(StagePB::AKSK);
                auto obj_info = stage.mutable_obj_info();
                if (obj_info->has_encryption_info()) {
                    AkSkPair plain_ak_sk_pair;
                    int ret = decrypt_ak_sk_helper(obj_info->ak(), obj_info->sk(),
                                                   obj_info->encryption_info(), &plain_ak_sk_pair,
                                                   code, msg);
                    if (ret != 0) return;
                    obj_info->set_ak(std::move(plain_ak_sk_pair.first));
                    obj_info->set_sk(std::move(plain_ak_sk_pair.second));
                }
            } else if (stage.access_type() == StagePB::BUCKET_ACL) {
                if (!instance.has_ram_user()) {
                    ss << "instance does not have ram user";
                    msg = ss.str();
                    code = MetaServiceCode::INVALID_ARGUMENT;
                    return;
                }
                if (instance.ram_user().has_encryption_info()) {
                    AkSkPair plain_ak_sk_pair;
                    int ret = decrypt_ak_sk_helper(
                            instance.ram_user().ak(), instance.ram_user().sk(),
                            instance.ram_user().encryption_info(), &plain_ak_sk_pair, code, msg);
                    if (ret != 0) return;
                    stage.mutable_obj_info()->set_ak(std::move(plain_ak_sk_pair.first));
                    stage.mutable_obj_info()->set_sk(std::move(plain_ak_sk_pair.second));
                } else {
                    stage.mutable_obj_info()->set_ak(instance.ram_user().ak());
                    stage.mutable_obj_info()->set_sk(instance.ram_user().sk());
                }
            } else if (stage.access_type() == StagePB::IAM) {
                std::string val;
                TxnErrorCode err = txn->get(system_meta_service_arn_info_key(), &val);
                if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
                    // For compatibility, use arn_info of config
                    stage.mutable_obj_info()->set_ak(config::arn_ak);
                    stage.mutable_obj_info()->set_sk(config::arn_sk);
                    stage.set_external_id(instance_id);
                } else if (err == TxnErrorCode::TXN_OK) {
                    RamUserPB iam_user;
                    if (!iam_user.ParseFromString(val)) {
                        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                        msg = "failed to parse RamUserPB";
                        return;
                    }
                    AkSkPair plain_ak_sk_pair;
                    int ret = decrypt_ak_sk_helper(iam_user.ak(), iam_user.sk(),
                                                   iam_user.encryption_info(), &plain_ak_sk_pair,
                                                   code, msg);
                    if (ret != 0) return;
                    stage.mutable_obj_info()->set_ak(std::move(plain_ak_sk_pair.first));
                    stage.mutable_obj_info()->set_sk(std::move(plain_ak_sk_pair.second));
                    stage.set_external_id(instance_id);
                } else {
                    code = cast_as<ErrCategory::READ>(err);
                    ss << "failed to get arn_info_key, err=" << err;
                    msg = ss.str();
                    return;
                }
            }
            response->add_stage()->CopyFrom(stage);
            return;
        }
    }

    ss << "stage not found with " << proto_to_json(*request);
    msg = ss.str();
    code = MetaServiceCode::STAGE_NOT_FOUND;
}

void MetaServiceImpl::drop_stage(google::protobuf::RpcController* controller,
                                 const DropStageRequest* request, DropStageResponse* response,
                                 ::google::protobuf::Closure* done) {
    StopWatch sw;
    auto ctrl = static_cast<brpc::Controller*>(controller);
    LOG(INFO) << "rpc from " << ctrl->remote_side() << " request=" << request->DebugString();
    brpc::ClosureGuard closure_guard(done);
    int ret = 0;
    MetaServiceCode code = MetaServiceCode::OK;
    std::string msg = "OK";
    std::string instance_id;
    bool drop_request = false;
    DORIS_CLOUD_DEFER {
        response->mutable_status()->set_code(code);
        response->mutable_status()->set_msg(msg);
        LOG(INFO) << (ret == 0 ? "succ to " : "failed to ") << __PRETTY_FUNCTION__ << " "
                  << ctrl->remote_side() << " " << msg;
        closure_guard.reset(nullptr);
        if (config::use_detailed_metrics && !instance_id.empty() && !drop_request) {
            g_bvar_ms_drop_stage.put(instance_id, sw.elapsed_us());
        }
    };

    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(drop_stage)

    if (!request->has_type()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "stage type not set";
        return;
    }
    auto type = request->type();

    if (type == StagePB::EXTERNAL && request->stage_name().empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "external stage but not set stage name";
        return;
    }

    if (type == StagePB::INTERNAL && request->mysql_user_id().empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "internal stage but not set user id";
        return;
    }

    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    std::unique_ptr<Transaction> txn;
    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }

    err = txn->get(key, &val);
    LOG(INFO) << "get instance_key=" << hex(key);
    std::stringstream ss;
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }

    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse InstanceInfoPB";
        return;
    }

    StagePB stage;
    int idx = -1;
    for (int i = 0; i < instance.stages_size(); ++i) {
        auto& s = instance.stages(i);
        if ((type == StagePB::INTERNAL && s.type() == StagePB::INTERNAL &&
             s.mysql_user_id(0) == request->mysql_user_id()) ||
            (type == StagePB::EXTERNAL && s.type() == StagePB::EXTERNAL &&
             s.name() == request->stage_name())) {
            idx = i;
            stage = s;
            break;
        }
    }
    if (idx == -1) {
        ss << "stage not found with " << proto_to_json(*request);
        msg = ss.str();
        code = MetaServiceCode::STAGE_NOT_FOUND;
        return;
    }

    auto& stages = const_cast<std::decay_t<decltype(instance.stages())>&>(instance.stages());
    stages.DeleteSubrange(idx, 1); // Remove it
    val = instance.SerializeAsString();
    if (val.empty()) {
        msg = "failed to serialize";
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        return;
    }
    txn->put(key, val);
    LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key)
              << " json=" << proto_to_json(instance);

    std::string key1;
    std::string val1;
    if (type == StagePB::INTERNAL) {
        RecycleStageKeyInfo recycle_stage_key_info {instance_id, stage.stage_id()};
        recycle_stage_key(recycle_stage_key_info, &key1);
        RecycleStagePB recycle_stage;
        recycle_stage.set_instance_id(instance_id);
        recycle_stage.set_reason(request->reason());
        recycle_stage.mutable_stage()->CopyFrom(stage);
        val1 = recycle_stage.SerializeAsString();
        if (val1.empty()) {
            msg = "failed to serialize";
            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
            return;
        }
        txn->put(key1, val1);
    }

    txn->atomic_add(system_meta_service_instance_update_key(), 1);
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        msg = fmt::format("failed to commit kv txn, err={}", err);
        LOG(WARNING) << msg;
    }
}

void MetaServiceImpl::get_iam(google::protobuf::RpcController* controller,
                              const GetIamRequest* request, GetIamResponse* response,
                              ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_iam, get);
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(get_iam)

    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    err = txn->get(key, &val);
    LOG(INFO) << "get instance_key=" << hex(key);

    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }

    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse InstanceInfoPB";
        return;
    }

    val.clear();
    err = txn->get(system_meta_service_arn_info_key(), &val);
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
        // For compatibility, use arn_info of config
        RamUserPB iam_user;
        iam_user.set_user_id(config::arn_id);
        iam_user.set_external_id(instance_id);
        iam_user.set_ak(config::arn_ak);
        iam_user.set_sk(config::arn_sk);
        response->mutable_iam_user()->CopyFrom(iam_user);
    } else if (err == TxnErrorCode::TXN_OK) {
        RamUserPB iam_user;
        if (!iam_user.ParseFromString(val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            msg = "failed to parse RamUserPB";
            return;
        }
        AkSkPair plain_ak_sk_pair;
        int ret = decrypt_ak_sk_helper(iam_user.ak(), iam_user.sk(), iam_user.encryption_info(),
                                       &plain_ak_sk_pair, code, msg);
        if (ret != 0) return;
        iam_user.set_external_id(instance_id);
        iam_user.set_ak(std::move(plain_ak_sk_pair.first));
        iam_user.set_sk(std::move(plain_ak_sk_pair.second));
        response->mutable_iam_user()->CopyFrom(iam_user);
    } else {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get arn_info_key, err=" << err;
        msg = ss.str();
        return;
    }

    if (instance.has_ram_user()) {
        RamUserPB ram_user;
        ram_user.CopyFrom(instance.ram_user());
        if (ram_user.has_encryption_info()) {
            AkSkPair plain_ak_sk_pair;
            int ret = decrypt_ak_sk_helper(ram_user.ak(), ram_user.sk(), ram_user.encryption_info(),
                                           &plain_ak_sk_pair, code, msg);
            if (ret != 0) return;
            ram_user.set_ak(std::move(plain_ak_sk_pair.first));
            ram_user.set_sk(std::move(plain_ak_sk_pair.second));
        }
        response->mutable_ram_user()->CopyFrom(ram_user);
    }
}

void MetaServiceImpl::alter_iam(google::protobuf::RpcController* controller,
                                const AlterIamRequest* request, AlterIamResponse* response,
                                ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(alter_iam, get, put);
    std::string arn_id = request->has_account_id() ? request->account_id() : "";
    std::string arn_ak = request->has_ak() ? request->ak() : "";
    std::string arn_sk = request->has_sk() ? request->sk() : "";
    if (arn_id.empty() || arn_ak.empty() || arn_sk.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "invalid argument";
        return;
    }
    RPC_RATE_LIMIT(alter_iam)

    // for metric, give it a common instance id
    instance_id = "alter_iam_instance";

    std::string key = system_meta_service_arn_info_key();
    std::string val;
    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    err = txn->get(key, &val);
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "fail to arn_info_key, err=" << err;
        msg = ss.str();
        return;
    }

    bool is_add_req = err == TxnErrorCode::TXN_KEY_NOT_FOUND;
    EncryptionInfoPB encryption_info;
    AkSkPair cipher_ak_sk_pair;
    if (encrypt_ak_sk_helper(arn_ak, arn_sk, &encryption_info, &cipher_ak_sk_pair, code, msg) !=
        0) {
        return;
    }
    const auto& [ak, sk] = cipher_ak_sk_pair;
    RamUserPB iam_user;
    std::string old_ak;
    std::string old_sk;
    if (!is_add_req) {
        if (!iam_user.ParseFromString(val)) {
            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
            ss << "failed to parse RamUserPB";
            msg = ss.str();
            return;
        }

        if (arn_id == iam_user.user_id() && ak == iam_user.ak() && sk == iam_user.sk()) {
            code = MetaServiceCode::INVALID_ARGUMENT;
            ss << "already has the same arn info";
            msg = ss.str();
            return;
        }
        old_ak = iam_user.ak();
        old_sk = iam_user.sk();
    }
    iam_user.set_user_id(arn_id);
    iam_user.set_ak(std::move(cipher_ak_sk_pair.first));
    iam_user.set_sk(std::move(cipher_ak_sk_pair.second));
    iam_user.mutable_encryption_info()->CopyFrom(encryption_info);
    val = iam_user.SerializeAsString();
    if (val.empty()) {
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        ss << "failed to serialize";
        msg = ss.str();
        return;
    }
    txn->put(key, val);
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        ss << "txn->commit failed() err=" << err;
        msg = ss.str();
        return;
    }
    if (is_add_req) {
        LOG(INFO) << "add new iam info, cipher ak: " << iam_user.ak()
                  << " cipher sk: " << iam_user.sk();
    } else {
        LOG(INFO) << "alter iam info, old:  cipher ak: " << old_ak << " cipher sk" << old_sk
                  << " new: cipher ak: " << iam_user.ak() << " cipher sk:" << iam_user.sk();
    }
}

void MetaServiceImpl::alter_ram_user(google::protobuf::RpcController* controller,
                                     const AlterRamUserRequest* request,
                                     AlterRamUserResponse* response,
                                     ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(alter_ram_user, get, put);
    instance_id = request->has_instance_id() ? request->instance_id() : "";
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        return;
    }
    if (!request->has_ram_user() || request->ram_user().user_id().empty() ||
        request->ram_user().ak().empty() || request->ram_user().sk().empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "ram user info err " + proto_to_json(*request);
        return;
    }
    auto& ram_user = request->ram_user();
    RPC_RATE_LIMIT(alter_ram_user)
    InstanceKeyInfo key_info {instance_id};
    std::string key;
    std::string val;
    instance_key(key_info, &key);

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    err = txn->get(key, &val);
    LOG(INFO) << "get instance_key=" << hex(key);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get instance, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }
    InstanceInfoPB instance;
    if (!instance.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse InstanceInfoPB";
        return;
    }
    if (instance.status() == InstanceInfoPB::DELETED) {
        code = MetaServiceCode::CLUSTER_NOT_FOUND;
        msg = "instance status has been set delete, plz check it";
        return;
    }
    if (instance.has_ram_user()) {
        LOG(WARNING) << "instance has ram user. instance_id=" << instance_id
                     << ", ram_user_id=" << ram_user.user_id();
    }
    EncryptionInfoPB encryption_info;
    AkSkPair cipher_ak_sk_pair;
    if (encrypt_ak_sk_helper(ram_user.ak(), ram_user.sk(), &encryption_info, &cipher_ak_sk_pair,
                             code, msg) != 0) {
        return;
    }
    RamUserPB new_ram_user;
    new_ram_user.CopyFrom(ram_user);
    new_ram_user.set_user_id(ram_user.user_id());
    new_ram_user.set_ak(std::move(cipher_ak_sk_pair.first));
    new_ram_user.set_sk(std::move(cipher_ak_sk_pair.second));
    new_ram_user.mutable_encryption_info()->CopyFrom(encryption_info);
    instance.mutable_ram_user()->CopyFrom(new_ram_user);
    val = instance.SerializeAsString();
    if (val.empty()) {
        msg = "failed to serialize";
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        return;
    }
    txn->atomic_add(system_meta_service_instance_update_key(), 1);
    txn->put(key, val);
    LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key);
    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        msg = fmt::format("failed to commit kv txn, err={}", err);
        LOG(WARNING) << msg;
    }
}

void MetaServiceImpl::begin_copy(google::protobuf::RpcController* controller,
                                 const BeginCopyRequest* request, BeginCopyResponse* response,
                                 ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(begin_copy, get, put);
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(begin_copy)
    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }

    // copy job key
    CopyJobKeyInfo key_info {instance_id, request->stage_id(), request->table_id(),
                             request->copy_id(), request->group_id()};
    std::string key;
    std::string val;
    copy_job_key(key_info, &key);
    // copy job value
    CopyJobPB copy_job;
    copy_job.set_stage_type(request->stage_type());
    copy_job.set_job_status(CopyJobPB::LOADING);
    copy_job.set_start_time_ms(request->start_time_ms());
    copy_job.set_timeout_time_ms(request->timeout_time_ms());

    std::vector<std::pair<std::string, std::string>> copy_files;
    auto& object_files = request->object_files();
    int file_num = 0;
    size_t file_size = 0;
    size_t file_meta_size = 0;
    for (auto i = 0; i < object_files.size(); ++i) {
        auto& file = object_files.at(i);
        // 1. get copy file kv to check if file is loading or loaded
        CopyFileKeyInfo file_key_info {instance_id, request->stage_id(), request->table_id(),
                                       file.relative_path(), file.etag()};
        std::string file_key;
        copy_file_key(file_key_info, &file_key);
        std::string file_val;
        TxnErrorCode err = txn->get(file_key, &file_val);
        if (err == TxnErrorCode::TXN_OK) { // found key
            continue;
        } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { // error
            code = cast_as<ErrCategory::READ>(err);
            msg = fmt::format("failed to get copy file, err={}", err);
            LOG(WARNING) << msg;
            return;
        }
        // 2. check if reach any limit
        ++file_num;
        file_size += file.size();
        file_meta_size += file.ByteSizeLong();
        if (file_num > 1 &&
            ((request->file_num_limit() > 0 && file_num > request->file_num_limit()) ||
             (request->file_size_limit() > 0 && file_size > request->file_size_limit()) ||
             (request->file_meta_size_limit() > 0 &&
              file_meta_size > request->file_meta_size_limit()))) {
            break;
        }
        // 3. put copy file kv
        CopyFilePB copy_file;
        copy_file.set_copy_id(request->copy_id());
        copy_file.set_group_id(request->group_id());
        std::string copy_file_val = copy_file.SerializeAsString();
        if (copy_file_val.empty()) {
            msg = "failed to serialize";
            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
            return;
        }
        copy_files.emplace_back(std::move(file_key), std::move(copy_file_val));
        // 3. add file to copy job value
        copy_job.add_object_files()->CopyFrom(file);
        response->add_filtered_object_files()->CopyFrom(file);
    }

    if (file_num == 0) {
        return;
    }

    val = copy_job.SerializeAsString();
    if (val.empty()) {
        msg = "failed to serialize";
        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
        return;
    }
    // put copy job
    txn->put(key, val);
    LOG(INFO) << "put copy_job_key=" << hex(key);
    // put copy file
    for (const auto& [k, v] : copy_files) {
        txn->put(k, v);
        LOG(INFO) << "put copy_file_key=" << hex(k);
    }

    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        msg = fmt::format("failed to commit kv txn, err={}", err);
        LOG(WARNING) << msg;
    }
}

void MetaServiceImpl::finish_copy(google::protobuf::RpcController* controller,
                                  const FinishCopyRequest* request, FinishCopyResponse* response,
                                  ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(finish_copy, get, put, del);
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(finish_copy)

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }

    // copy job key
    CopyJobKeyInfo key_info {instance_id, request->stage_id(), request->table_id(),
                             request->copy_id(), request->group_id()};
    std::string key;
    std::string val;
    copy_job_key(key_info, &key);
    err = txn->get(key, &val);
    LOG(INFO) << "get copy_job_key=" << hex(key);

    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // not found
        code = MetaServiceCode::COPY_JOB_NOT_FOUND;
        ss << "copy job does not found";
        msg = ss.str();
        return;
    } else if (err != TxnErrorCode::TXN_OK) { // error
        code = cast_as<ErrCategory::READ>(err);
        ss << "failed to get copy_job, instance_id=" << instance_id << " err=" << err;
        msg = ss.str();
        return;
    }

    CopyJobPB copy_job;
    if (!copy_job.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse CopyJobPB";
        return;
    }

    std::vector<std::string> copy_files;
    if (request->action() == FinishCopyRequest::COMMIT) {
        // 1. update copy job status from Loading to Finish
        copy_job.set_job_status(CopyJobPB::FINISH);
        if (request->has_finish_time_ms()) {
            copy_job.set_finish_time_ms(request->finish_time_ms());
        }
        val = copy_job.SerializeAsString();
        if (val.empty()) {
            msg = "failed to serialize";
            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
            return;
        }
        txn->put(key, val);
        LOG(INFO) << "put copy_job_key=" << hex(key);
    } else if (request->action() == FinishCopyRequest::ABORT ||
               request->action() == FinishCopyRequest::REMOVE) {
        // 1. remove copy job kv
        // 2. remove copy file kvs
        txn->remove(key);
        LOG(INFO) << (request->action() == FinishCopyRequest::ABORT ? "abort" : "remove")
                  << " copy_job_key=" << hex(key);
        for (const auto& file : copy_job.object_files()) {
            // copy file key
            CopyFileKeyInfo file_key_info {instance_id, request->stage_id(), request->table_id(),
                                           file.relative_path(), file.etag()};
            std::string file_key;
            copy_file_key(file_key_info, &file_key);
            copy_files.emplace_back(std::move(file_key));
        }
        for (const auto& k : copy_files) {
            txn->remove(k);
            LOG(INFO) << "remove copy_file_key=" << hex(k);
        }
    } else {
        msg = "Unhandled action";
        code = MetaServiceCode::UNDEFINED_ERR;
        return;
    }

    err = txn->commit();
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::COMMIT>(err);
        msg = fmt::format("failed to commit kv txn, err={}", err);
        LOG(WARNING) << msg;
    }
}

void MetaServiceImpl::get_copy_job(google::protobuf::RpcController* controller,
                                   const GetCopyJobRequest* request, GetCopyJobResponse* response,
                                   ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_copy_job, get);
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }

    CopyJobKeyInfo key_info {instance_id, request->stage_id(), request->table_id(),
                             request->copy_id(), request->group_id()};
    std::string key;
    copy_job_key(key_info, &key);
    std::string val;
    err = txn->get(key, &val);
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // not found key
        return;
    } else if (err != TxnErrorCode::TXN_OK) { // error
        code = cast_as<ErrCategory::READ>(err);
        msg = fmt::format("failed to get copy job, err={}", err);
        LOG(WARNING) << msg << " err=" << err;
        return;
    }
    CopyJobPB copy_job;
    if (!copy_job.ParseFromString(val)) {
        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
        msg = "failed to parse CopyJobPB";
        return;
    }
    response->mutable_copy_job()->CopyFrom(copy_job);
}

void MetaServiceImpl::get_copy_files(google::protobuf::RpcController* controller,
                                     const GetCopyFilesRequest* request,
                                     GetCopyFilesResponse* response,
                                     ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_copy_files, get);
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(get_copy_files)

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }

    CopyJobKeyInfo key_info0 {instance_id, request->stage_id(), request->table_id(), "", 0};
    CopyJobKeyInfo key_info1 {instance_id, request->stage_id(), request->table_id() + 1, "", 0};
    std::string key0;
    std::string key1;
    copy_job_key(key_info0, &key0);
    copy_job_key(key_info1, &key1);
    std::unique_ptr<RangeGetIterator> it;
    do {
        TxnErrorCode err = txn->get(key0, key1, &it);
        if (err != TxnErrorCode::TXN_OK) {
            code = cast_as<ErrCategory::READ>(err);
            msg = fmt::format("failed to get copy jobs, err={}", err);
            LOG(WARNING) << msg << " err=" << err;
            return;
        }

        while (it->has_next()) {
            auto [k, v] = it->next();
            if (!it->has_next()) key0 = k;
            CopyJobPB copy_job;
            if (!copy_job.ParseFromArray(v.data(), v.size())) {
                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
                msg = "failed to parse CopyJobPB";
                return;
            }
            // TODO check if job is timeout
            for (const auto& file : copy_job.object_files()) {
                response->add_object_files()->CopyFrom(file);
            }
        }
        key0.push_back('\x00');
    } while (it->more());
}

void MetaServiceImpl::filter_copy_files(google::protobuf::RpcController* controller,
                                        const FilterCopyFilesRequest* request,
                                        FilterCopyFilesResponse* response,
                                        ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(filter_copy_files, get);
    std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
    if (cloud_unique_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud unique id not set";
        return;
    }

    instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
    if (instance_id.empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "empty instance_id";
        LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id;
        return;
    }
    RPC_RATE_LIMIT(filter_copy_files)

    TxnErrorCode err = txn_kv_->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        code = cast_as<ErrCategory::CREATE>(err);
        msg = "failed to create txn";
        LOG(WARNING) << msg << " err=" << err;
        return;
    }

    std::vector<ObjectFilePB> filter_files;
    for (auto i = 0; i < request->object_files().size(); ++i) {
        auto& file = request->object_files().at(i);
        // 1. get copy file kv to check if file is loading or loaded
        CopyFileKeyInfo file_key_info {instance_id, request->stage_id(), request->table_id(),
                                       file.relative_path(), file.etag()};
        std::string file_key;
        copy_file_key(file_key_info, &file_key);
        std::string file_val;
        TxnErrorCode err = txn->get(file_key, &file_val);
        if (err == TxnErrorCode::TXN_OK) { // found key
            continue;
        } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { // error
            msg = fmt::format("failed to get copy file, err={}", err);
            LOG(WARNING) << msg << " err=" << err;
            return;
        } else {
            response->add_object_files()->CopyFrom(file);
        }
    }
}

void MetaServiceImpl::get_cluster_status(google::protobuf::RpcController* controller,
                                         const GetClusterStatusRequest* request,
                                         GetClusterStatusResponse* response,
                                         ::google::protobuf::Closure* done) {
    RPC_PREPROCESS(get_cluster_status, get);
    if (request->instance_ids().empty() && request->cloud_unique_ids().empty()) {
        code = MetaServiceCode::INVALID_ARGUMENT;
        msg = "cloud_unique_ids or instance_ids must be given, instance_ids.size: " +
              std::to_string(request->instance_ids().size()) +
              " cloud_unique_ids.size: " + std::to_string(request->cloud_unique_ids().size());
        return;
    }

    std::vector<std::string> instance_ids;
    instance_ids.reserve(
            std::max(request->instance_ids().size(), request->cloud_unique_ids().size()));

    // priority use instance_ids
    if (!request->instance_ids().empty()) {
        std::for_each(request->instance_ids().begin(), request->instance_ids().end(),
                      [&](const auto& it) { instance_ids.emplace_back(it); });
    } else if (!request->cloud_unique_ids().empty()) {
        std::for_each(request->cloud_unique_ids().begin(), request->cloud_unique_ids().end(),
                      [&](const auto& it) {
                          std::string instance_id = get_instance_id(resource_mgr_, it);
                          if (instance_id.empty()) {
                              LOG(INFO) << "cant get instance_id from cloud_unique_id : " << it;
                              return;
                          }
                          instance_ids.emplace_back(instance_id);
                      });
    }

    if (instance_ids.empty()) {
        LOG(INFO) << "can't get valid instanceids";
        return;
    }
    bool has_filter = request->has_status();

    RPC_RATE_LIMIT(get_cluster_status)

    auto get_clusters_info = [this, &request, &response,
                              &has_filter](const std::string& instance_id) {
        InstanceKeyInfo key_info {instance_id};
        std::string key;
        std::string val;
        instance_key(key_info, &key);

        std::unique_ptr<Transaction> txn;
        TxnErrorCode err = txn_kv_->create_txn(&txn);
        if (err != TxnErrorCode::TXN_OK) {
            LOG(WARNING) << "failed to create txn err=" << err;
            return;
        }
        DORIS_CLOUD_DEFER {
            if (config::use_detailed_metrics && txn != nullptr) {
                g_bvar_rpc_kv_get_cluster_status_get_bytes.put({instance_id}, txn->get_bytes());
                g_bvar_rpc_kv_get_cluster_status_get_counter.put({instance_id},
                                                                 txn->num_get_keys());
            }
        };
        err = txn->get(key, &val);
        LOG(INFO) << "get instance_key=" << hex(key);

        if (err != TxnErrorCode::TXN_OK) {
            LOG(WARNING) << "failed to get instance, instance_id=" << instance_id << " err=" << err;
            return;
        }
        InstanceInfoPB instance;
        if (!instance.ParseFromString(val)) {
            LOG(WARNING) << "failed to parse InstanceInfoPB";
            return;
        }
        GetClusterStatusResponse::GetClusterStatusResponseDetail detail;
        detail.set_instance_id(instance_id);
        for (auto& cluster : instance.clusters()) {
            if (cluster.type() != ClusterPB::COMPUTE) {
                continue;
            }
            ClusterPB pb;
            pb.set_cluster_name(cluster.cluster_name());
            pb.set_cluster_id(cluster.cluster_id());
            if (has_filter && request->status() != cluster.cluster_status()) {
                continue;
            }
            // for compatible
            if (cluster.has_cluster_status()) {
                pb.set_cluster_status(cluster.cluster_status());
            } else {
                pb.set_cluster_status(ClusterStatus::NORMAL);
            }
            detail.add_clusters()->CopyFrom(pb);
        }
        if (detail.clusters().size() == 0) {
            return;
        }
        response->add_details()->CopyFrom(detail);
    };

    std::for_each(instance_ids.begin(), instance_ids.end(), get_clusters_info);

    msg = proto_to_json(*response);
}

void notify_refresh_instance(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id,
                             KVStats* stats, bool include_self) {
    if (!config::enable_notify_instance_update) {
        LOG(INFO) << "notify_refresh_instance is disabled";
        return;
    }

    LOG(INFO) << "begin notify_refresh_instance, include_self=" << include_self;
    TEST_SYNC_POINT_RETURN_WITH_VOID("notify_refresh_instance_return");
    std::unique_ptr<Transaction> txn;
    TxnErrorCode err = txn_kv->create_txn(&txn);
    if (err != TxnErrorCode::TXN_OK) {
        LOG(WARNING) << "failed to create txn err=" << err;
        return;
    }
    std::string key = system_meta_service_registry_key();
    std::string val;
    err = txn->get(key, &val);
    if (stats) {
        stats->get_counter++;
    }
    if (err != TxnErrorCode::TXN_OK) {
        LOG(WARNING) << "failed to get server registry"
                     << " err=" << err;
        return;
    }
    if (stats) {
        stats->get_bytes += val.size() + key.size();
    }
    std::string self_endpoint =
            config::hostname.empty() ? get_local_ip(config::priority_networks) : config::hostname;
    self_endpoint = fmt::format("{}:{}", self_endpoint, config::brpc_listen_port);
    ServiceRegistryPB reg;
    reg.ParseFromString(val);

    brpc::ChannelOptions options;
    options.connection_type = brpc::ConnectionType::CONNECTION_TYPE_SHORT;

    static std::unordered_map<std::string, std::shared_ptr<MetaService_Stub>> stubs;
    static std::mutex mtx;

    std::vector<bthread_t> btids;
    btids.reserve(reg.items_size());
    for (int i = 0; i < reg.items_size(); ++i) {
        int ret = 0;
        auto& e = reg.items(i);
        std::string endpoint;
        if (e.has_host()) {
            endpoint = fmt::format("{}:{}", e.host(), e.port());
        } else {
            endpoint = fmt::format("{}:{}", e.ip(), e.port());
        }
        if (endpoint == self_endpoint && !include_self) continue;

        // Prepare stub
        std::shared_ptr<MetaService_Stub> stub;
        do {
            std::lock_guard l(mtx);
            if (auto it = stubs.find(endpoint); it != stubs.end()) {
                stub = it->second;
                break;
            }
            auto channel = std::make_unique<brpc::Channel>();
            ret = channel->Init(endpoint.c_str(), &options);
            if (ret != 0) {
                LOG(WARNING) << "fail to init brpc channel, endpoint=" << endpoint;
                break;
            }
            stub = std::make_shared<MetaService_Stub>(channel.release(),
                                                      google::protobuf::Service::STUB_OWNS_CHANNEL);
        } while (false);
        if (ret != 0) continue;

        // Issue RPC
        auto f = new std::function<void()>([instance_id, stub, endpoint] {
            int num_try = 0;
            bool succ = false;
            while (num_try++ < 3) {
                brpc::Controller cntl;
                cntl.set_timeout_ms(3000);
                AlterInstanceRequest req;
                AlterInstanceResponse res;
                req.set_instance_id(instance_id);
                req.set_op(AlterInstanceRequest::REFRESH);
                stub->alter_instance(&cntl, &req, &res, nullptr);
                if (cntl.Failed()) {
                    LOG_WARNING("issue refresh instance rpc")
                            .tag("endpoint", endpoint)
                            .tag("num_try", num_try)
                            .tag("code", cntl.ErrorCode())
                            .tag("msg", cntl.ErrorText());
                } else {
                    succ = res.status().code() == MetaServiceCode::OK;
                    LOG(INFO) << (succ ? "succ" : "failed")
                              << " to issue refresh_instance rpc, num_try=" << num_try
                              << " endpoint=" << endpoint << " response=" << proto_to_json(res);
                    if (succ) return;
                }
                bthread_usleep(300000);
            }
            if (succ) return;
            LOG(WARNING) << "failed to refresh finally, it may left the system inconsistent,"
                         << " tired=" << num_try;
        });
        bthread_t bid;
        ret = bthread_start_background(&bid, nullptr, run_bthread_work, f);
        if (ret != 0) continue;
        btids.emplace_back(bid);
    } // for
    for (auto& i : btids) bthread_join(i, nullptr);
    LOG(INFO) << "finish notify_refresh_instance, num_items=" << reg.items_size();
}

} // namespace doris::cloud
